You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2016/07/07 12:44:50 UTC

lucene-solr:master: SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient. (Marvin Justice, Christine Poerschke)

Repository: lucene-solr
Updated Branches:
  refs/heads/master f61a5f27d -> 976079a8e


SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient. (Marvin Justice, Christine Poerschke)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/976079a8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/976079a8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/976079a8

Branch: refs/heads/master
Commit: 976079a8ee8a2cff1c8df01ae9f2856b3ddcdac3
Parents: f61a5f2
Author: Christine Poerschke <cp...@apache.org>
Authored: Tue Jul 5 17:14:47 2016 +0100
Committer: Christine Poerschke <cp...@apache.org>
Committed: Thu Jul 7 10:03:21 2016 +0100

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../solr/client/solrj/impl/CloudSolrClient.java | 129 ++++++++++++++++---
 .../client/solrj/request/UpdateRequest.java     |   6 +
 .../solrj/impl/CloudSolrClientBuilderTest.java  |  10 ++
 .../client/solrj/impl/CloudSolrClientTest.java  |  24 +++-
 .../java/org/apache/solr/SolrTestCaseJ4.java    |  56 +++++++-
 6 files changed, 203 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 95fa796..40add1d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -88,6 +88,9 @@ New Features
 * SOLR-9243: Add terms.list parameter to the TermsComponent to fetch the docFreq for a list of terms
   (Joel Bernstein)
 
+* SOLR-9090: Add directUpdatesToLeadersOnly flag to solrj CloudSolrClient.
+  (Marvin Justice, Christine Poerschke)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index f5e18b3..876f7f8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -53,6 +53,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.ToleratedUpdateError;
 import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
@@ -117,6 +118,7 @@ public class CloudSolrClient extends SolrClient {
   Random rand = new Random();
   
   private final boolean updatesToLeaders;
+  private final boolean directUpdatesToLeadersOnly;
   private boolean parallelUpdates = true;
   private ExecutorService threadPool = ExecutorUtil
       .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
@@ -206,6 +208,7 @@ public class CloudSolrClient extends SolrClient {
       this.lbClient.setRequestWriter(new BinaryRequestWriter());
       this.lbClient.setParser(new BinaryResponseParser());
       this.updatesToLeaders = true;
+      this.directUpdatesToLeadersOnly = false;
       shutdownLBHttpSolrServer = true;
       lbClient.addQueryParams(STATE_VERSION);
   }
@@ -242,6 +245,7 @@ public class CloudSolrClient extends SolrClient {
     this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
     this.lbClient = createLBHttpSolrClient(myClient);
     this.updatesToLeaders = true;
+    this.directUpdatesToLeadersOnly = false;
     shutdownLBHttpSolrServer = true;
     lbClient.addQueryParams(STATE_VERSION);
   }
@@ -299,6 +303,7 @@ public class CloudSolrClient extends SolrClient {
     this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
     this.lbClient = createLBHttpSolrClient(myClient);
     this.updatesToLeaders = true;
+    this.directUpdatesToLeadersOnly = false;
     shutdownLBHttpSolrServer = true;
   }
   
@@ -329,8 +334,38 @@ public class CloudSolrClient extends SolrClient {
    */
   @Deprecated
   public CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient, boolean updatesToLeaders) {
+    this(zkHosts, chroot, httpClient, lbSolrClient, updatesToLeaders, false);
+  }
+
+  /**
+   * Create a new client object that connects to Zookeeper and is always aware
+   * of the SolrCloud state. If there is a fully redundant Zookeeper quorum and
+   * SolrCloud has enough replicas for every shard in a collection, there is no
+   * single point of failure. Updates will be sent to shard leaders by default.
+   *
+   * @param zkHosts
+   *          A Java Collection (List, Set, etc) of HOST:PORT strings, one for
+   *          each host in the zookeeper ensemble. Note that with certain
+   *          Collection types like HashSet, the order of hosts in the final
+   *          connect string may not be in the same order you added them.
+   * @param chroot
+   *          A chroot value for zookeeper, starting with a forward slash. If no
+   *          chroot is required, use null.
+   * @param httpClient
+   *          the {@link HttpClient} instance to be used for all requests. The provided httpClient should use a
+   *          multi-threaded connection manager.  If null, a default HttpClient will be used.
+   * @param lbSolrClient
+   *          LBHttpSolrServer instance for requests.  If null, a default HttpClient will be used.
+   * @param updatesToLeaders
+   *          If true, sends updates to shard leaders.
+   * @param directUpdatesToLeadersOnly
+   *          If true, sends direct updates to shard leaders only.
+   */
+  private CloudSolrClient(Collection<String> zkHosts, String chroot, HttpClient httpClient, LBHttpSolrClient lbSolrClient,
+      boolean updatesToLeaders, boolean directUpdatesToLeadersOnly) {
     this.zkHost = buildZkHostString(zkHosts, chroot);
     this.updatesToLeaders = updatesToLeaders;
+    this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
     
     this.clientIsInternal = httpClient == null;
     this.myClient = httpClient == null ? HttpClientUtil.createClient(null) : httpClient;
@@ -374,6 +409,7 @@ public class CloudSolrClient extends SolrClient {
     this.lbClient.setRequestWriter(new BinaryRequestWriter());
     this.lbClient.setParser(new BinaryResponseParser());
     this.updatesToLeaders = updatesToLeaders;
+    this.directUpdatesToLeadersOnly = false;
     shutdownLBHttpSolrServer = true;
     lbClient.addQueryParams(STATE_VERSION);
   }
@@ -414,6 +450,7 @@ public class CloudSolrClient extends SolrClient {
     this.zkHost = zkHost;
     this.lbClient = lbClient;
     this.updatesToLeaders = updatesToLeaders;
+    this.directUpdatesToLeadersOnly = false;
     shutdownLBHttpSolrServer = false;
     this.clientIsInternal = false;
     lbClient.addQueryParams(STATE_VERSION);
@@ -648,15 +685,18 @@ public class CloudSolrClient extends SolrClient {
     //Create the URL map, which is keyed on slice name.
     //The value is a list of URLs for each replica in the slice.
     //The first value in the list is the leader for the slice.
-    Map<String,List<String>> urlMap = buildUrlMap(col);
-    if (urlMap == null) {
-      // we could not find a leader yet - use unoptimized general path
-      return null;
-    }
-
-    Map<String, LBHttpSolrClient.Req> routes = updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField);
+    final Map<String,List<String>> urlMap = buildUrlMap(col);
+    final Map<String, LBHttpSolrClient.Req> routes = (urlMap == null ? null : updateRequest.getRoutes(router, col, urlMap, routableParams, this.idField));
     if (routes == null) {
-      return null;
+      if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
+          // we have info (documents with ids and/or ids to delete) with
+          // which to find the leaders but we could not find (all of) them
+          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+              "directUpdatesToLeadersOnly==true but could not find leader(s)");
+      } else {
+        // we could not find a leader or routes yet - use unoptimized general path
+        return null;
+      }
     }
 
     final NamedList<Throwable> exceptions = new NamedList<>();
@@ -764,21 +804,23 @@ public class CloudSolrClient extends SolrClient {
       List<String> urls = new ArrayList<>();
       Replica leader = slice.getLeader();
       if (leader == null) {
+        if (directUpdatesToLeadersOnly) {
+          continue;
+        }
         // take unoptimized general path - we cannot find a leader yet
         return null;
       }
       ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
       String url = zkProps.getCoreUrl();
       urls.add(url);
-      Collection<Replica> replicas = slice.getReplicas();
-      Iterator<Replica> replicaIterator = replicas.iterator();
-      while (replicaIterator.hasNext()) {
-        Replica replica = replicaIterator.next();
-        if (!replica.getNodeName().equals(leader.getNodeName()) &&
-            !replica.getName().equals(leader.getName())) {
-          ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
-          String url1 = zkProps1.getCoreUrl();
-          urls.add(url1);
+      if (!directUpdatesToLeadersOnly) {
+        for (Replica replica : slice.getReplicas()) {
+          if (!replica.getNodeName().equals(leader.getNodeName()) &&
+              !replica.getName().equals(leader.getName())) {
+            ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
+            String url1 = zkProps1.getCoreUrl();
+            urls.add(url1);
+          }
         }
       }
       urlMap.put(name, urls);
@@ -1284,6 +1326,13 @@ public class CloudSolrClient extends SolrClient {
     return updatesToLeaders;
   }
 
+  /**
+   * @return true if direct updates are sent to shard leaders only
+   */
+  public boolean isDirectUpdatesToLeadersOnly() {
+    return directUpdatesToLeadersOnly;
+  }
+
   /**If caches are expired they are refreshed after acquiring a lock.
    * use this to set the number of locks
    */
@@ -1417,6 +1466,31 @@ public class CloudSolrClient extends SolrClient {
     this.lbClient.setSoTimeout(timeout);
   }
 
+  private static boolean hasInfoToFindLeaders(UpdateRequest updateRequest, String idField) {
+    final Map<SolrInputDocument,Map<String,Object>> documents = updateRequest.getDocumentsMap();
+    final Map<String,Map<String,Object>> deleteById = updateRequest.getDeleteByIdMap();
+
+    final boolean hasNoDocuments = (documents == null || documents.isEmpty());
+    final boolean hasNoDeleteById = (deleteById == null || deleteById.isEmpty());
+    if (hasNoDocuments && hasNoDeleteById) {
+      // no documents and no delete-by-id, so no info to find leader(s)
+      return false;
+    }
+
+    if (documents != null) {
+      for (final Map.Entry<SolrInputDocument,Map<String,Object>> entry : documents.entrySet()) {
+        final SolrInputDocument doc = entry.getKey();
+        final Object fieldValue = doc.getFieldValue(idField);
+        if (fieldValue == null) {
+          // a document with no id field value, so can't find leader for it
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
   private static LBHttpSolrClient createLBHttpSolrClient(HttpClient httpClient) {
     final LBHttpSolrClient lbClient = new LBHttpSolrClient.Builder()
         .withHttpClient(httpClient)
@@ -1466,6 +1540,7 @@ public class CloudSolrClient extends SolrClient {
     private String zkChroot;
     private LBHttpSolrClient loadBalancedSolrClient;
     private boolean shardLeadersOnly;
+    private boolean directUpdatesToLeadersOnly;
     
     public Builder() {
       this.zkHosts = new ArrayList();
@@ -1543,10 +1618,28 @@ public class CloudSolrClient extends SolrClient {
     }
 
     /**
+     * Tells {@link Builder} that created clients should send direct updates to shard leaders only.
+     */
+    public Builder sendDirectUpdatesToShardLeadersOnly() {
+      directUpdatesToLeadersOnly = true;
+      return this;
+    }
+
+    /**
+     * Tells {@link Builder} that created clients can send updates
+     * to any shard replica (shard leaders and non-leaders).
+     */
+    public Builder sendDirectUpdatesToAnyShardReplica() {
+      directUpdatesToLeadersOnly = false;
+      return this;
+    }
+
+    /**
      * Create a {@link CloudSolrClient} based on the provided configuration.
      */
     public CloudSolrClient build() {
-      return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient, shardLeadersOnly);
+      return new CloudSolrClient(zkHosts, zkChroot, httpClient, loadBalancedSolrClient,
+          shardLeadersOnly, directUpdatesToLeadersOnly);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
index f93a197..aec6e22 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/UpdateRequest.java
@@ -257,6 +257,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
           return null;
         }
         List<String> urls = urlMap.get(slice.getName());
+        if (urls == null) {
+          return null;
+        }
         String leaderUrl = urls.get(0);
         LBHttpSolrClient.Req request = (LBHttpSolrClient.Req) routes
             .get(leaderUrl);
@@ -305,6 +308,9 @@ public class UpdateRequest extends AbstractUpdateRequest {
           return null;
         }
         List<String> urls = urlMap.get(slice.getName());
+        if (urls == null) {
+          return null;
+        }
         String leaderUrl = urls.get(0);
         LBHttpSolrClient.Req request = routes.get(leaderUrl);
         if (request != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
index 57692c7..5f1c5c2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientBuilderTest.java
@@ -87,4 +87,14 @@ public class CloudSolrClientBuilderTest extends LuceneTestCase {
       assertTrue(createdClient.isUpdatesToLeaders() == true);
     }
   }
+
+  @Test
+  public void testIsDirectUpdatesToLeadersOnlyDefault() throws IOException {
+    try(CloudSolrClient createdClient = new Builder()
+        .withZkHost(ANY_ZK_HOST)
+        .withZkChroot(ANY_CHROOT)
+        .build()) {
+      assertFalse(createdClient.isDirectUpdatesToLeadersOnly());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 616ddc4..cf12036 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -149,6 +149,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     
     // Test single threaded routed updates for UpdateRequest
     NamedList<Object> response = cluster.getSolrClient().request(request, COLLECTION);
+    if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
+      checkSingleServer(response);
+    }
     CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
     Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
     Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it = routes.entrySet()
@@ -173,10 +176,13 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     
     // Test the deleteById routing for UpdateRequest
     
-    new UpdateRequest()
+    final UpdateResponse uResponse = new UpdateRequest()
         .deleteById("0")
         .deleteById("2")
         .commit(cluster.getSolrClient(), COLLECTION);
+    if (cluster.getSolrClient().isDirectUpdatesToLeadersOnly()) {
+      checkSingleServer(uResponse.getResponse());
+    }
 
     QueryResponse qResponse = cluster.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
     SolrDocumentList docs = qResponse.getResults();
@@ -187,6 +193,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
       threadedClient.setParallelUpdates(true);
       threadedClient.setDefaultCollection(COLLECTION);
       response = threadedClient.request(request);
+      if (threadedClient.isDirectUpdatesToLeadersOnly()) {
+        checkSingleServer(response);
+      }
       rr = (CloudSolrClient.RouteResponse) response;
       routes = rr.getRoutes();
       it = routes.entrySet()
@@ -540,4 +549,17 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
       HttpClientUtil.close(client);
     }
   }
+
+  private static void checkSingleServer(NamedList<Object> response) {
+    final CloudSolrClient.RouteResponse rr = (CloudSolrClient.RouteResponse) response;
+    final Map<String,LBHttpSolrClient.Req> routes = rr.getRoutes();
+    final Iterator<Map.Entry<String,LBHttpSolrClient.Req>> it =
+        routes.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String,LBHttpSolrClient.Req> entry = it.next();
+        assertEquals("wrong number of servers: "+entry.getValue().getServers(),
+            1, entry.getValue().getServers().size());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/976079a8/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 7a41454..ea70805 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -2041,11 +2041,55 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
     return (0 == TestUtil.nextInt(random(), 0, 9)) ? unlikely : likely;
   }
   
+  public static class CloudSolrClientBuilder extends CloudSolrClient.Builder {
+
+    private boolean configuredDUTflag = false;
+
+    public CloudSolrClientBuilder() {
+      super();
+    }
+
+    @Override
+    public CloudSolrClient.Builder sendDirectUpdatesToShardLeadersOnly() {
+      configuredDUTflag = true;
+      return super.sendDirectUpdatesToShardLeadersOnly();
+    }
+
+    @Override
+    public CloudSolrClient.Builder sendDirectUpdatesToAnyShardReplica() {
+      configuredDUTflag = true;
+      return super.sendDirectUpdatesToAnyShardReplica();
+    }
+
+    private void randomlyChooseDirectUpdatesToLeadersOnly() {
+      if (random().nextBoolean()) {
+        sendDirectUpdatesToShardLeadersOnly();
+      } else {
+        sendDirectUpdatesToAnyShardReplica();
+      }
+    }
+
+    @Override
+    public CloudSolrClient build() {
+      if (configuredDUTflag == false) {
+        // flag value not explicity configured
+        if (random().nextBoolean()) {
+          // so randomly choose a value
+          randomlyChooseDirectUpdatesToLeadersOnly();
+        } else {
+          // or go with whatever the default value is
+          configuredDUTflag = true;
+        }
+      }
+      return super.build();
+    }
+  }
+
   public static CloudSolrClient getCloudSolrClient(String zkHost) {
     if (random().nextBoolean()) {
       return new CloudSolrClient(zkHost);
     }
-    return new CloudSolrClient.Builder()
+    return new CloudSolrClientBuilder()
         .withZkHost(zkHost)
         .build();
   }
@@ -2054,7 +2098,7 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
     if (random().nextBoolean()) {
       return new CloudSolrClient(zkHost, httpClient);
     }
-    return new CloudSolrClient.Builder()
+    return new CloudSolrClientBuilder()
         .withZkHost(zkHost)
         .withHttpClient(httpClient)
         .build();
@@ -2066,12 +2110,12 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
     }
     
     if (shardLeadersOnly) {
-      return new CloudSolrClient.Builder()
+      return new CloudSolrClientBuilder()
           .withZkHost(zkHost)
           .sendUpdatesOnlyToShardLeaders()
           .build();
     }
-    return new CloudSolrClient.Builder()
+    return new CloudSolrClientBuilder()
         .withZkHost(zkHost)
         .sendUpdatesToAllReplicasInShard()
         .build();
@@ -2083,13 +2127,13 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
     }
     
     if (shardLeadersOnly) {
-      return new CloudSolrClient.Builder()
+      return new CloudSolrClientBuilder()
           .withZkHost(zkHost)
           .withHttpClient(httpClient)
           .sendUpdatesOnlyToShardLeaders()
           .build();
     }
-    return new CloudSolrClient.Builder()
+    return new CloudSolrClientBuilder()
         .withZkHost(zkHost)
         .withHttpClient(httpClient)
         .sendUpdatesToAllReplicasInShard()