You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/02/26 14:04:29 UTC
svn commit: r1662439 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/servlet/
solrj/src/java/org/apache/solr/client/solrj/impl/
solrj/src/java/org/apache/solr/common/cloud/
solrj/src/test/org/apache/solr/client/solrj/impl/
Author: noble
Date: Thu Feb 26 13:04:28 2015
New Revision: 1662439
URL: http://svn.apache.org/r1662439
Log:
SOLR-7130: Make stale state notification work without failing the requests
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Feb 26 13:04:28 2015
@@ -208,6 +208,9 @@ Other Changes
* SOLR-7160: Rename ConfigSolr to NodeConfig, and decouple it from xml
representation (Alan Woodward)
+* SOLR-7130: Make stale state notification work without failing the requests
+ (Noble Paul, shalin)
+
================== 5.0.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java Thu Feb 26 13:04:28 2015
@@ -89,6 +89,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -96,6 +97,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import static java.util.Collections.singletonMap;
+
/**
* This filter looks at the incoming URL maps them to handlers defined in solrconfig.xml
*
@@ -220,6 +223,8 @@ public class SolrDispatchFilter extends
SolrCore core = null;
SolrQueryRequest solrReq = null;
Aliases aliases = null;
+ //The states of client that is invalid in this request
+ Map<String, Integer> invalidStates = null;
if( request instanceof HttpServletRequest) {
HttpServletRequest req = (HttpServletRequest)request;
@@ -310,11 +315,15 @@ public class SolrDispatchFilter extends
String coreUrl = getRemotCoreUrl(cores, corename, origCorename);
// don't proxy for internal update requests
SolrParams queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());
- checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION));
+ invalidStates = checkStateIsValid(cores, queryParams.get(CloudSolrClient.STATE_VERSION));
if (coreUrl != null
&& queryParams
.get(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM) == null) {
path = path.substring(idx);
+ if (invalidStates != null) {
+ //it does not make sense to send the request to a remote node
+ throw new SolrException(ErrorCode.INVALID_STATE, new String(ZkStateReader.toJSON(invalidStates), org.apache.lucene.util.IOUtils.UTF_8));
+ }
remoteQuery(coreUrl + path, req, solrReq, resp);
return;
} else {
@@ -371,7 +380,7 @@ public class SolrDispatchFilter extends
if( "/select".equals( path ) || "/select/".equals( path ) ) {
solrReq = parser.parse( core, path, req );
- checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
+ invalidStates = checkStateIsValid(cores,solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
String qt = solrReq.getParams().get( CommonParams.QT );
handler = core.getRequestHandler( qt );
if( handler == null ) {
@@ -418,7 +427,8 @@ public class SolrDispatchFilter extends
resp.addHeader(entry.getKey(), entry.getValue());
}
QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
- writeResponse(solrRsp, response, responseWriter, solrReq, reqMethod);
+ if(invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
+ writeResponse(solrRsp, response, responseWriter, solrReq, reqMethod);
}
return; // we are done with a valid handler
}
@@ -461,21 +471,24 @@ public class SolrDispatchFilter extends
chain.doFilter(request, response);
}
- private void checkStateIsValid(CoreContainer cores, String stateVer) {
+ private Map<String , Integer> checkStateIsValid(CoreContainer cores, String stateVer) {
+ Map<String, Integer> result = null;
+ String[] pairs = null;
if (stateVer != null && !stateVer.isEmpty() && cores.isZooKeeperAware()) {
// many have multiple collections separated by |
- String[] pairs = StringUtils.split(stateVer, '|');
+ pairs = StringUtils.split(stateVer, '|');
for (String pair : pairs) {
String[] pcs = StringUtils.split(pair, ':');
if (pcs.length == 2 && !pcs[0].isEmpty() && !pcs[1].isEmpty()) {
- Boolean status = cores.getZkController().getZkStateReader().checkValid(pcs[0], Integer.parseInt(pcs[1]));
-
- if (Boolean.TRUE != status) {
- throw new SolrException(ErrorCode.INVALID_STATE, "STATE STALE: " + pair + "valid : " + status);
+ Integer status = cores.getZkController().getZkStateReader().compareStateVersions(pcs[0], Integer.parseInt(pcs[1]));
+ if(status != null ){
+ if(result == null) result = new HashMap<>();
+ result.put(pcs[0], status);
}
}
}
}
+ return result;
}
private void processAliases(SolrQueryRequest solrReq, Aliases aliases,
@@ -747,6 +760,11 @@ public class SolrDispatchFilter extends
QueryResponseWriter responseWriter, SolrQueryRequest solrReq, Method reqMethod)
throws IOException {
try {
+ Object invalidStates = solrReq.getContext().get(CloudSolrClient.STATE_VERSION);
+ //This is the last item added to the response and the client would expect it that way.
+ //If that assumption is changed , it would fail. This is done to avoid an O(n) scan on
+ // the response for each request
+ if(invalidStates != null) solrRsp.add(CloudSolrClient.STATE_VERSION, invalidStates);
// Now write it out
final String ct = responseWriter.getContentType(solrReq, solrRsp);
// don't call setContentType on null
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java Thu Feb 26 13:04:28 2015
@@ -529,7 +529,7 @@ public class CloudSolrClient extends Sol
}
}
- DocCollection col = getDocCollection(clusterState, collection);
+ DocCollection col = getDocCollection(clusterState, collection,null);
DocRouter router = col.getRouter();
@@ -774,7 +774,7 @@ public class CloudSolrClient extends Sol
StringBuilder stateVerParamBuilder = null;
for (String requestedCollection : requestedCollectionNames) {
// track the version of state we're using on the client side using the _stateVer_ param
- DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
+ DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null);
int collVer = coll.getZNodeVersion();
if (coll.getStateFormat()>1) {
if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
@@ -807,6 +807,15 @@ public class CloudSolrClient extends Sol
NamedList<Object> resp = null;
try {
resp = sendRequest(request);
+ Object o = resp.get(STATE_VERSION, resp.size()-1);
+ if(o != null && o instanceof Map) {
+ Map invalidStates = (Map) o;
+ for (Object invalidEntries : invalidStates.entrySet()) {
+ Map.Entry e = (Map.Entry) invalidEntries;
+ getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue());
+ }
+
+ }
} catch (Exception exc) {
Throwable rootCause = SolrException.getRootCause(exc);
@@ -860,7 +869,7 @@ public class CloudSolrClient extends Sol
!requestedCollections.isEmpty() &&
wasCommError) {
for (DocCollection ext : requestedCollections) {
- DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName());
+ DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null);
if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
// looks like we couldn't reach the server because the state was stale == retry
stateWasStale = true;
@@ -949,7 +958,7 @@ public class CloudSolrClient extends Sol
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionNames) {
- DocCollection col = getDocCollection(clusterState, collectionName);
+ DocCollection col = getDocCollection(clusterState, collectionName, null);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true);
}
@@ -1099,10 +1108,13 @@ public class CloudSolrClient extends Sol
}
- protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
+ protected DocCollection getDocCollection(ClusterState clusterState, String collection, Integer expectedVersion) throws SolrException {
if(collection == null) return null;
DocCollection col = getFromCache(collection);
- if(col != null) return col;
+ if(col != null) {
+ if(expectedVersion == null) return col;
+ if(expectedVersion.intValue() == col.getZNodeVersion()) return col;
+ }
ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection);
if(ref == null){
@@ -1118,8 +1130,15 @@ public class CloudSolrClient extends Sol
synchronized (lock){
//we have waited for sometime just check once again
col = getFromCache(collection);
- if(col !=null) return col;
- col = ref.get();
+ if(col !=null) {
+ if(expectedVersion == null) return col;
+ if(expectedVersion.intValue() == col.getZNodeVersion()) {
+ return col;
+ } else {
+ collectionStateCache.remove(collection);
+ }
+ }
+ col = ref.get();//this is a call to ZK
}
if(col == null ) return null;
if(col.getStateFormat() >1) collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Thu Feb 26 13:04:28 2015
@@ -268,13 +268,13 @@ public class ZkStateReader implements Cl
return aliases;
}
- public Boolean checkValid(String coll, int version) {
+ public Integer compareStateVersions(String coll, int version) {
DocCollection collection = clusterState.getCollectionOrNull(coll);
if (collection == null) return null;
if (collection.getZNodeVersion() < version) {
log.debug("server older than client {}<{}", collection.getZNodeVersion(), version);
DocCollection nu = getCollectionLive(this, coll);
- if (nu == null) return null;
+ if (nu == null) return -1 ;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
updateWatchedCollection(nu);
collection = nu;
@@ -282,12 +282,12 @@ public class ZkStateReader implements Cl
}
if (collection.getZNodeVersion() == version) {
- return Boolean.TRUE;
+ return null;
}
log.debug("wrong version from client {}!={} ", version, collection.getZNodeVersion());
- return Boolean.FALSE;
+ return collection.getZNodeVersion();
}
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java?rev=1662439&r1=1662438&r2=1662439&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java Thu Feb 26 13:04:28 2015
@@ -502,7 +502,7 @@ public class CloudSolrClientTest extends
try (CloudSolrClient client = createCloudClient(null)) {
String collectionName = "checkStateVerCol";
- createCollection(collectionName, client, 2, 2);
+ createCollection(collectionName, client, 1, 3);
waitForRecoveriesToFinish(collectionName, false);
DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
@@ -520,19 +520,13 @@ public class CloudSolrClientTest extends
q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion() - 1)); //an older version expect error
- try {
- solrClient.query(q);
- log.info("expected query error");
- } catch (HttpSolrClient.RemoteSolrException e) {
- sse = e;
- }
-
- assertNotNull(sse);
- assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code);
-
+ QueryResponse rsp = solrClient.query(q);
+ Map m = (Map) rsp.getResponse().get(CloudSolrClient.STATE_VERSION, rsp.getResponse().size()-1);
+ assertNotNull("Expected an extra information from server with the list of invalid collection states", m);
+ assertNotNull(m.get(collectionName));
}
- //now send the request to another node that does n ot serve the collection
+ //now send the request to another node that does not serve the collection
Set<String> allNodesOfColl = new HashSet<>();
for (Slice slice : coll.getSlices()) {
@@ -541,27 +535,28 @@ public class CloudSolrClientTest extends
}
}
String theNode = null;
- for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
+ Set<String> liveNodes = client.getZkStateReader().getClusterState().getLiveNodes();
+ for (String s : liveNodes) {
String n = client.getZkStateReader().getBaseUrlForNodeName(s);
- if(!allNodesOfColl.contains(s)){
+ if(!allNodesOfColl.contains(n)){
theNode = n;
break;
}
}
- log.info("thenode which does not serve this collection{} ",theNode);
+ log.info("the node which does not serve this collection{} ",theNode);
assertNotNull(theNode);
try (SolrClient solrClient = new HttpSolrClient(theNode + "/"+collectionName)) {
- q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + coll.getZNodeVersion());
+ q.setParam(CloudSolrClient.STATE_VERSION, collectionName + ":" + (coll.getZNodeVersion()-1));
try {
- solrClient.query(q);
+ QueryResponse rsp = solrClient.query(q);
log.info("error was expected");
} catch (HttpSolrClient.RemoteSolrException e) {
sse = e;
}
assertNotNull(sse);
- assertEquals(" Error code should be ", sse.code(), SolrException.ErrorCode.INVALID_STATE.code);
+ assertEquals(" Error code should be 510", SolrException.ErrorCode.INVALID_STATE.code, sse.code());
}
}