You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2017/05/05 00:13:18 UTC

[2/3] lucene-solr:jira/solr-10233: RTG requests are forwarded to REALTIME replicas when distrib=true

RTG requests are forwarded to REALTIME replicas when distrib=true


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3d49a6ea
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3d49a6ea
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3d49a6ea

Branch: refs/heads/jira/solr-10233
Commit: 3d49a6ea3cd4a896d6734bccf9a45ff497937761
Parents: a1421ea
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Thu May 4 17:10:19 2017 -0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Thu May 4 17:10:19 2017 -0700

----------------------------------------------------------------------
 .../apache/solr/handler/RealTimeGetHandler.java | 18 +++++--
 .../handler/component/HttpShardHandler.java     | 29 ++++++++++-
 .../handler/component/RealTimeGetComponent.java | 29 +++++++----
 .../apache/solr/cloud/TestAppendReplica.java    | 54 ++++++++++++++++----
 .../apache/solr/cloud/TestPassiveReplica.java   | 50 +++++++++++++++---
 .../java/org/apache/solr/SolrTestCaseJ4.java    | 12 ++++-
 6 files changed, 158 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d49a6ea/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
index bce374f..247b65c 100644
--- a/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
@@ -16,14 +16,17 @@
  */
 package org.apache.solr.handler;
 
-import org.apache.solr.api.Api;
-import org.apache.solr.api.ApiBag;
-import org.apache.solr.handler.component.*;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.solr.api.Api;
+import org.apache.solr.api.ApiBag;
+import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.handler.component.SearchHandler;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
 
 public class RealTimeGetHandler extends SearchHandler {
   @Override
@@ -33,6 +36,13 @@ public class RealTimeGetHandler extends SearchHandler {
     names.add(RealTimeGetComponent.COMPONENT_NAME);
     return names;
   }
+  
+  
+  @Override
+  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
+    req.getContext().put("distribOnlyRealtime", Boolean.TRUE);
+    super.handleRequestBody(req, rsp);
+  }
 
   //////////////////////// SolrInfoMBeans methods //////////////////////
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d49a6ea/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 8c0a9cb..f2d2639 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -349,9 +349,12 @@ public class HttpShardHandler extends ShardHandler {
       // and make it a non-distributed request.
       String ourSlice = cloudDescriptor.getShardId();
       String ourCollection = cloudDescriptor.getCollectionName();
+      // Some requests may only be fulfilled by replicas of type Replica.Type.REALTIME
+      boolean onlyRealtimeReplicas = Boolean.TRUE == req.getContext().get("distribOnlyRealtime");
       if (rb.slices.length == 1 && rb.slices[0] != null
           && ( rb.slices[0].equals(ourSlice) || rb.slices[0].equals(ourCollection + "_" + ourSlice) )  // handle the <collection>_<slice> format
-          && cloudDescriptor.getLastPublished() == Replica.State.ACTIVE) {
+          && cloudDescriptor.getLastPublished() == Replica.State.ACTIVE
+          && (!onlyRealtimeReplicas || cloudDescriptor.getReplicaType() == Replica.Type.REALTIME)) {
         boolean shortCircuit = params.getBool("shortCircuit", true);       // currently just a debugging parameter to check distrib search on a single node
 
         String targetHandler = params.get(ShardParams.SHARDS_QT);
@@ -387,14 +390,36 @@ public class HttpShardHandler extends ShardHandler {
             continue;
             // throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
           }
+          Replica shardLeader = null;
 
           final Collection<Replica> allSliceReplicas = slice.getReplicasMap().values();
           final List<Replica> eligibleSliceReplicas = new ArrayList<>(allSliceReplicas.size());
           for (Replica replica : allSliceReplicas) {
             if (!clusterState.liveNodesContain(replica.getNodeName())
-                || replica.getState() != Replica.State.ACTIVE) {
+                || replica.getState() != Replica.State.ACTIVE
+                || (onlyRealtimeReplicas && replica.getType() == Replica.Type.PASSIVE)) {
               continue;
             }
+            
+            if (onlyRealtimeReplicas && replica.getType() == Replica.Type.APPEND) {
+              if (shardLeader == null) {
+                try {
+                  shardLeader = zkController.getZkStateReader().getLeaderRetry(cloudDescriptor.getCollectionName(), slice.getName());
+                } catch (InterruptedException e) {
+                  throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + slice.getName() + " in collection " 
+                      + cloudDescriptor.getCollectionName(), e);
+                } catch (SolrException e) {
+                  if (log.isDebugEnabled()) {
+                    log.debug("Exception finding leader for shard {} in collection {}. Collection State: {}", 
+                        slice.getName(), cloudDescriptor.getCollectionName(), zkController.getZkStateReader().getClusterState().getCollectionOrNull(cloudDescriptor.getCollectionName()));
+                  }
+                  throw e;
+                }
+              }
+              if (!replica.getName().equals(shardLeader.getName())) {
+                continue;
+              }
+            }
             eligibleSliceReplicas.add(replica);
           }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d49a6ea/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index d785868..18e202e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -16,6 +16,10 @@
  */
 package org.apache.solr.handler.component;
 
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.ID;
+import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
+
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -24,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,9 +73,9 @@ import org.apache.solr.schema.FieldType;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.DocList;
-import org.apache.solr.search.SolrDocumentFetcher;
 import org.apache.solr.search.QParser;
 import org.apache.solr.search.ReturnFields;
+import org.apache.solr.search.SolrDocumentFetcher;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.search.SolrReturnFields;
 import org.apache.solr.search.SyntaxError;
@@ -82,10 +87,6 @@ import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CommonParams.DISTRIB;
-import static org.apache.solr.common.params.CommonParams.ID;
-import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
-
 public class RealTimeGetComponent extends SearchComponent
 {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -105,11 +106,19 @@ public class RealTimeGetComponent extends SearchComponent
     SolrQueryRequest req = rb.req;
     SolrQueryResponse rsp = rb.rsp;
     SolrParams params = req.getParams();
-
-    if (req.getCore().getCoreDescriptor().getCloudDescriptor() != null 
-        && !req.getCore().getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
-      //nocommit: forward request to leader
-      return;
+    CloudDescriptor cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
+
+    if (cloudDesc != null) {
+      Replica.Type replicaType = cloudDesc.getReplicaType();
+      if (replicaType != null) {
+        if (replicaType == Replica.Type.PASSIVE) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, 
+              String.format(Locale.ROOT, "%s can't handle realtime get requests. Replicas of type %s do not support these type of requests", 
+                  cloudDesc.getCoreNodeName(),
+                  Replica.Type.PASSIVE));
+        } 
+        // non-leader APPEND replicas should not respond to distrib /get requests, but internal requests are OK
+      }
     }
     
     if (!params.getBool(COMPONENT_NAME, true)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d49a6ea/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java
index a7f3042..b56c5a7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestAppendReplica.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.http.client.HttpClient;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrClient;
@@ -41,6 +42,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CollectionStatePredicate;
@@ -255,16 +257,48 @@ public class TestAppendReplica extends SolrCloudTestCase {
     doReplaceLeader(false);
   }
   
-  public void testPassiveReplicaStates() {
-    // Validate that passive replicas go through the correct states when starting, stopping, reconnecting
-  }
-  
-  public void testPassiveReplicaCantConnectToZooKeeper() {
-    
-  }
-  
-  public void testRealTimeGet() {
-    // should be redirected to writers or error
+  public void testRealTimeGet() throws SolrServerException, IOException, KeeperException, InterruptedException {
+    // should be redirected to Replica.Type.REALTIME
+    int numReplicas = random().nextBoolean()?1:2;
+    int numRealtimeReplicas = random().nextBoolean()?0:2;
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, numRealtimeReplicas, numReplicas, 0)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    waitForState("Unexpected replica count", collectionName, activeReplicaCount(numRealtimeReplicas, numReplicas, 0));
+    DocCollection docCollection = assertNumberOfReplicas(numRealtimeReplicas, numReplicas, 0, false, true);
+    HttpClient httpClient = cluster.getSolrClient().getHttpClient();
+    int id = 0;
+    Slice slice = docCollection.getSlice("shard1");
+    List<String> ids = new ArrayList<>(slice.getReplicas().size());
+    for (Replica rAdd:slice.getReplicas()) {
+      try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) {
+        client.add(new SolrInputDocument("id", String.valueOf(id), "foo_s", "bar"));
+      }
+      SolrDocument docCloudClient = cluster.getSolrClient().getById(collectionName, String.valueOf(id));
+      assertEquals("bar", docCloudClient.getFieldValue("foo_s"));
+      for (Replica rGet:slice.getReplicas()) {
+        try (HttpSolrClient client = getHttpSolrClient(rGet.getCoreUrl(), httpClient)) {
+          SolrDocument doc = client.getById(String.valueOf(id));
+          assertEquals("bar", doc.getFieldValue("foo_s"));
+        }
+      }
+      ids.add(String.valueOf(id));
+      id++;
+    }
+    SolrDocumentList previousAllIdsResult = null;
+    for (Replica rAdd:slice.getReplicas()) {
+      try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) {
+        SolrDocumentList allIdsResult = client.getById(ids);
+        if (previousAllIdsResult != null) {
+          assertTrue(compareSolrDocumentList(previousAllIdsResult, allIdsResult));
+        } else {
+          // set the first response here
+          previousAllIdsResult = allIdsResult;
+          assertEquals("Unexpected number of documents", ids.size(), allIdsResult.getNumFound());
+        }
+      }
+      id++;
+    }
   }
   
   /*

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d49a6ea/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java
index e158a92..d087c19 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.http.client.HttpClient;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -36,6 +37,8 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CollectionStatePredicate;
@@ -289,8 +292,47 @@ public class TestPassiveReplica extends SolrCloudTestCase {
     assertEquals("Expecting DOWN->RECOVERING->ACTIVE but saw: " + Arrays.toString(statesSeen.toArray()), Replica.State.ACTIVE, statesSeen.get(0));
   }
   
-  public void testRealTimeGet() {
-    // should be redirected to writers
+  public void testRealTimeGet() throws SolrServerException, IOException, KeeperException, InterruptedException {
+    // should be redirected to Replica.Type.REALTIME
+    int numReplicas = random().nextBoolean()?1:2;
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, numReplicas, 0, numReplicas)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    waitForState("Unexpected replica count", collectionName, activeReplicaCount(numReplicas, 0, numReplicas));
+    DocCollection docCollection = assertNumberOfReplicas(numReplicas, 0, numReplicas, false, true);
+    HttpClient httpClient = cluster.getSolrClient().getHttpClient();
+    int id = 0;
+    Slice slice = docCollection.getSlice("shard1");
+    List<String> ids = new ArrayList<>(slice.getReplicas().size());
+    for (Replica rAdd:slice.getReplicas()) {
+      try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) {
+        client.add(new SolrInputDocument("id", String.valueOf(id), "foo_s", "bar"));
+      }
+      SolrDocument docCloudClient = cluster.getSolrClient().getById(collectionName, String.valueOf(id));
+      assertEquals("bar", docCloudClient.getFieldValue("foo_s"));
+      for (Replica rGet:slice.getReplicas()) {
+        try (HttpSolrClient client = getHttpSolrClient(rGet.getCoreUrl(), httpClient)) {
+          SolrDocument doc = client.getById(String.valueOf(id));
+          assertEquals("bar", doc.getFieldValue("foo_s"));
+        }
+      }
+      ids.add(String.valueOf(id));
+      id++;
+    }
+    SolrDocumentList previousAllIdsResult = null;
+    for (Replica rAdd:slice.getReplicas()) {
+      try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) {
+        SolrDocumentList allIdsResult = client.getById(ids);
+        if (previousAllIdsResult != null) {
+          assertTrue(compareSolrDocumentList(previousAllIdsResult, allIdsResult));
+        } else {
+          // set the first response here
+          previousAllIdsResult = allIdsResult;
+          assertEquals("Unexpected number of documents", ids.size(), allIdsResult.getNumFound());
+        }
+      }
+      id++;
+    }
   }
   
   /*
@@ -418,10 +460,6 @@ public class TestPassiveReplica extends SolrCloudTestCase {
     waitForNumDocsInAllActiveReplicas(2);
   }
   
-  public void testAddDocsToPassive() {
-    
-  }
-  
   public void testSearchWhileReplicationHappens() {
       
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d49a6ea/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 7e443bb..0489ab0 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -2121,9 +2121,17 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
     SolrDocumentList list1 = (SolrDocumentList) expected;
     SolrDocumentList list2 = (SolrDocumentList) actual;
 
-    if(Float.compare(list1.getMaxScore(), list2.getMaxScore()) != 0 || list1.getNumFound() != list2.getNumFound() ||
-        list1.getStart() != list2.getStart()) {
+    if (list1.getMaxScore() == null) {
+      if (list2.getMaxScore() != null) {
+        return false;
+      } 
+    } else if (list2.getMaxScore() == null) {
       return false;
+    } else {
+      if (Float.compare(list1.getMaxScore(), list2.getMaxScore()) != 0 || list1.getNumFound() != list2.getNumFound() ||
+          list1.getStart() != list2.getStart()) {
+        return false;
+      }
     }
     for(int i=0; i<list1.getNumFound(); i++) {
       if(!compareSolrDocument(list1.get(i), list2.get(i))) {