You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/02/26 14:04:29 UTC

svn commit: r1662439 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/servlet/ solrj/src/java/org/apache/solr/client/solrj/impl/ solrj/src/java/org/apache/solr/common/cloud/ solrj/src/test/org/apache/solr/client/solrj/impl/

Author: noble
Date: Thu Feb 26 13:04:28 2015
New Revision: 1662439

URL: http://svn.apache.org/r1662439
Log:
SOLR-7130: Make stale state notification work without failing the requests

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Feb 26 13:04:28 2015
@@ -208,6 +208,9 @@ Other Changes
 * SOLR-7160: Rename ConfigSolr to NodeConfig, and decouple it from xml
   representation (Alan Woodward)
 
+* SOLR-7130: Make stale state notification work without failing the requests
+  (Noble Paul, shalin)
+
 ==================  5.0.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Thu Feb 26 13:04:28 2015
@@ -89,6 +89,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -96,6 +97,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import static java.util.Collections.singletonMap;
+
 /**
  * This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml
  *
@@ -220,6 +223,8 @@ public class SolrDispatchFilter extends
     SolrCore core = null;
     SolrQueryRequest solrReq = null;
     Aliases aliases = null;
+    //The states of client that is invalid in this request
+    Map<String, Integer> invalidStates = null;
     
     if( request instanceof HttpServletRequest) {
       HttpServletRequest req = (HttpServletRequest)request;
@@ -310,11 +315,15 @@ public class SolrDispatchFilter extends
             String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
             // don't proxy for internal update requests
             SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
-            checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION));
+            invalidStates = checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION));
             if (coreUrl != null
                 && queryParams
                     .get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
               path = path.substring(idx);
+              if (invalidStates != null) {
+                //it does not make sense to send the request to a remote node
+                throw new SolrException(ErrorCode.INVALID_STATE, new String(ZkStateReader.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8));
+              }
               remoteQuery(coreUrl + path, req, solrReq, resp);
               return;
             } else {
@@ -371,7 +380,7 @@ public class SolrDispatchFilter extends
               if( "/select".equals( path ) || "/select/".equals( path ) ) {
                 solrReq = parser.parse( core, path, req );
 
-                checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
+                invalidStates = checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
                 String qt = solrReq.getParams().get( CommonParams.QT );
                 handler = core.getRequestHandler( qt );
                 if( handler == null ) {
@@ -418,7 +427,8 @@ public class SolrDispatchFilter extends
                   resp.addHeader(entry.getKey(), entry.getValue());
                 }
                QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
-               writeResponse(solrRsp, response, responseWriter, solrReq, reqMethod);
+              if(invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
+              writeResponse(solrRsp, response, responseWriter, solrReq, reqMethod);
             }
             return; // we are done with a valid handler
           }
@@ -461,21 +471,24 @@ public class SolrDispatchFilter extends
     chain.doFilter(request, response);
   }
 
-  private void checkStateIsValid(CoreContainer cores, String stateVer) {
+  private Map<String , Integer> checkStateIsValid(CoreContainer cores, String stateVer) {
+    Map<String, Integer> result = null;
+    String[] pairs = null;
     if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) {
       // many have multiple collections separated by |
-      String[] pairs = StringUtils.split(stateVer, '|');
+      pairs = StringUtils.split(stateVer, '|');
       for (String pair : pairs) {
         String[] pcs = StringUtils.split(pair, ':');
         if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) {
-          Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0], Integer.parseInt(pcs[1]));
-          
-          if (Boolean.TRUE != status) {
-            throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair + "valid : " + status);
+          Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], Integer.parseInt(pcs[1]));
+          if(status != null ){
+            if(result == null) result =  new HashMap<>();
+            result.put(pcs[0], status);
           }
         }
       }
     }
+    return result;
   }
 
   private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
@@ -747,6 +760,11 @@ public class SolrDispatchFilter extends
                              QueryResponseWriter responseWriter, SolrQueryRequest solrReq, Method reqMethod)
           throws IOException {
     try {
+      Object invalidStates = solrReq.getContext().get(CloudSolrClient.STATE_VERSION);
+      //This is the last item added to the response and the client would expect it that way.
+      //If that assumption is changed , it would fail. This is done to avoid an O(n) scan on
+      // the response for each request
+      if(invalidStates != null) solrRsp.add(CloudSolrClient.STATE_VERSION, invalidStates);
       // Now write it out
       final String ct = responseWriter.getContentType(solrReq, solrRsp);
       // don't call setContentType on null

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java Thu Feb 26 13:04:28 2015
@@ -529,7 +529,7 @@ public class CloudSolrClient extends Sol
       }
     }
 
-    DocCollection col = getDocCollection(clusterState, collection);
+    DocCollection col = getDocCollection(clusterState, collection,null);
 
     DocRouter router = col.getRouter();
     
@@ -774,7 +774,7 @@ public class CloudSolrClient extends Sol
       StringBuilder stateVerParamBuilder = null;
       for (String requestedCollection : requestedCollectionNames) {
         // track the version of state we're using on the client side using the _stateVer_ param
-        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
+        DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null);
         int collVer = coll.getZNodeVersion();
         if (coll.getStateFormat()>1) {
           if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
@@ -807,6 +807,15 @@ public class CloudSolrClient extends Sol
     NamedList<Object> resp = null;
     try {
       resp = sendRequest(request);
+      Object o = resp.get(STATE_VERSION, resp.size()-1);
+      if(o != null && o instanceof Map) {
+        Map invalidStates = (Map) o;
+        for (Object invalidEntries : invalidStates.entrySet()) {
+          Map.Entry e = (Map.Entry) invalidEntries;
+          getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue());
+        }
+
+      }
     } catch (Exception exc) {
 
       Throwable rootCause = SolrException.getRootCause(exc);
@@ -860,7 +869,7 @@ public class CloudSolrClient extends Sol
           !requestedCollections.isEmpty() &&
           wasCommError) {
         for (DocCollection ext : requestedCollections) {
-          DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName());
+          DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null);
           if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
             // looks like we couldn't reach the server because the state was stale == retry
             stateWasStale = true;
@@ -949,7 +958,7 @@ public class CloudSolrClient extends Sol
       // add it to the Map of slices.
       Map<String,Slice> slices = new HashMap<>();
       for (String collectionName : collectionNames) {
-        DocCollection col = getDocCollection(clusterState, collectionName);
+        DocCollection col = getDocCollection(clusterState, collectionName, null);
         Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
         ClientUtils.addSlices(slices, collectionName, routeSlices, true);
       }
@@ -1099,10 +1108,13 @@ public class CloudSolrClient extends Sol
   }
 
 
-  protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
+  protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException {
     if(collection == null) return null;
     DocCollection col = getFromCache(collection);
-    if(col != null) return col;
+    if(col != null) {
+      if(expectedVersion == null) return col;
+      if(expectedVersion.intValue() == col.getZNodeVersion()) return col;
+    }
 
     ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection);
     if(ref == null){
@@ -1118,8 +1130,15 @@ public class CloudSolrClient extends Sol
     synchronized (lock){
       //we have waited for sometime just check once again
       col = getFromCache(collection);
-      if(col !=null) return col;
-      col = ref.get();
+      if(col !=null) {
+        if(expectedVersion == null) return col;
+        if(expectedVersion.intValue() == col.getZNodeVersion()) {
+          return col;
+        } else {
+          collectionStateCache.remove(collection);
+        }
+      }
+      col = ref.get();//this is a call to ZK
     }
     if(col == null ) return  null;
     if(col.getStateFormat() >1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Thu Feb 26 13:04:28 2015
@@ -268,13 +268,13 @@ public class ZkStateReader implements Cl
     return aliases;
   }
 
-  public Boolean checkValid(String coll, int version) {
+  public Integer compareStateVersions(String coll, int version) {
     DocCollection collection = clusterState.getCollectionOrNull(coll);
     if (collection == null) return null;
     if (collection.getZNodeVersion() < version) {
       log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
       DocCollection nu = getCollectionLive(this, coll);
-      if (nu == null) return null;
+      if (nu == null) return -1 ;
       if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
         updateWatchedCollection(nu);
         collection = nu;
@@ -282,12 +282,12 @@ public class ZkStateReader implements Cl
     }
     
     if (collection.getZNodeVersion() == version) {
-      return Boolean.TRUE;
+      return null;
     }
     
     log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
     
-    return Boolean.FALSE;
+    return collection.getZNodeVersion();
   }
   
   public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,

Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java Thu Feb 26 13:04:28 2015
@@ -502,7 +502,7 @@ public class CloudSolrClientTest extends
 
     try (CloudSolrClient client = createCloudClient(null)) {
       String collectionName = "checkStateVerCol";
-      createCollection(collectionName, client, 2, 2);
+      createCollection(collectionName, client, 1, 3);
       waitForRecoveriesToFinish(collectionName, false);
       DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
       Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
@@ -520,19 +520,13 @@ public class CloudSolrClientTest extends
 
         q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
 
-        try {
-          solrClient.query(q);
-          log.info("expected query error");
-        } catch (HttpSolrClient.RemoteSolrException e) {
-          sse = e;
-        }
-
-        assertNotNull(sse);
-        assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code);
-
+        QueryResponse rsp = solrClient.query(q);
+        Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
+        assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
+        assertNotNull(m.get(collectionName));
       }
 
-      //now send the request to another node that does n ot serve the collection
+      //now send the request to another node that does not serve the collection
 
       Set<String> allNodesOfColl = new HashSet<>();
       for (Slice slice : coll.getSlices()) {
@@ -541,27 +535,28 @@ public class CloudSolrClientTest extends
         }
       }
       String theNode = null;
-      for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
+      Set<String> liveNodes = client.getZkStateReader().getClusterState().getLiveNodes();
+      for (String s : liveNodes) {
         String n = client.getZkStateReader().getBaseUrlForNodeName(s);
-        if(!allNodesOfColl.contains(s)){
+        if(!allNodesOfColl.contains(n)){
           theNode = n;
           break;
         }
       }
-      log.info("thenode which does not serve this collection{} ",theNode);
+      log.info("the node which does not serve this collection{} ",theNode);
       assertNotNull(theNode);
 
       try (SolrClient solrClient = new HttpSolrClient(theNode + "/"+collectionName)) {
 
-        q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + coll.getZNodeVersion());
+        q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion()-1));
         try {
-          solrClient.query(q);
+          QueryResponse rsp = solrClient.query(q);
           log.info("error was expected");
         } catch (HttpSolrClient.RemoteSolrException e) {
           sse = e;
         }
         assertNotNull(sse);
-        assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code);
+        assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
       }
     }