You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2014/06/19 18:25:36 UTC
svn commit: r1603938 [5/5] - in /lucene/dev/branches/solr-5473: ./
dev-tools/ lucene/ lucene/analysis/ lucene/analysis/common/
lucene/analysis/common/src/java/org/apache/lucene/analysis/hunspell/
lucene/analysis/common/src/test/org/apache/lucene/analys...
Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/AssignTest.java Thu Jun 19 16:25:31 2014
@@ -86,7 +86,7 @@ public class AssignTest extends SolrTest
collectionStates.put(cname, docCollection);
Set<String> liveNodes = new HashSet<>();
- ClusterState state = new ClusterState(-1,liveNodes, collectionStates);
+ ClusterState state = new ClusterState(-1,liveNodes, collectionStates, ClusterStateTest.getMockZkStateReader(collectionStates.keySet()));
String nodeName = Assign.assignNode("collection1", state);
assertEquals("core_node2", nodeName);
Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java Thu Jun 19 16:25:31 2014
@@ -62,10 +62,10 @@ public class ClusterStateTest extends So
collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
ZkStateReader zkStateReaderMock = getMockZkStateReader(collectionStates.keySet());
- ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+ ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates,zkStateReaderMock);
byte[] bytes = ZkStateReader.toJSON(clusterState);
// System.out.println("#################### " + new String(bytes));
- ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+ ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes,zkStateReaderMock,null);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
@@ -73,13 +73,13 @@ public class ClusterStateTest extends So
assertEquals("Poperties not copied properly", replica.getStr("prop1"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop1"));
assertEquals("Poperties not copied properly", replica.getStr("prop2"), loadedClusterState.getSlice("collection1", "shard1").getReplicasMap().get("node1").getStr("prop2"));
- loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes);
+ loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes, getMockZkStateReader(Collections.<String>emptySet()),null );
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
assertEquals("Should not have collections", 0, loadedClusterState.getCollections().size());
- loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes);
+ loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes,getMockZkStateReader(Collections.<String>emptySet()),null);
assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
.getLiveNodes().size());
@@ -89,6 +89,13 @@ public class ClusterStateTest extends So
public static ZkStateReader getMockZkStateReader(final Set<String> collections) {
ZkStateReader mock = createMock(ZkStateReader.class);
EasyMock.reset(mock);
+ mock.getAllCollections();
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Set<String>>() {
+ @Override
+ public Set<String> answer() throws Throwable {
+ return collections;
+ }
+ }).anyTimes();
EasyMock.replay(mock);
return mock;
Added: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java?rev=1603938&view=auto
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java (added)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/ExternalCollectionsTest.java Thu Jun 19 16:25:31 2014
@@ -0,0 +1,119 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public class ExternalCollectionsTest extends AbstractFullDistribZkTestBase {
+ private CloudSolrServer client;
+
+ @BeforeClass
+ public static void beforeThisClass2() throws Exception {
+
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ System.setProperty("numShards", Integer.toString(sliceCount));
+ System.setProperty("solr.xml.persist", "true");
+ client = createCloudClient(null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ client.shutdown();
+ }
+
+ protected String getSolrXml() {
+ return "solr-no-core.xml";
+ }
+
+ public ExternalCollectionsTest() {
+ fixShardCount = true;
+
+ sliceCount = 2;
+ shardCount = 4;
+
+ checkCreatedVsState = false;
+ }
+
+
+ @Override
+ public void doTest() throws Exception {
+ testZkNodeLocation();
+ }
+
+
+ boolean externalColl = false;
+
+ @Override
+ protected int getStateFormat() {
+ return externalColl ? 2:1;
+ }
+
+ private void testZkNodeLocation() throws Exception{
+ externalColl=true;
+
+ String collectionName = "myExternColl";
+
+ createCollection(collectionName, client, 2, 2);
+
+ waitForRecoveriesToFinish(collectionName, false);
+ assertTrue("does not exist collection state externally",
+ cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
+ Stat stat = new Stat();
+ byte[] data = cloudClient.getZkStateReader().getZkClient().getData(ZkStateReader.getCollectionPath(collectionName), null, stat, true);
+ DocCollection c = ZkStateReader.getCollectionLive(cloudClient.getZkStateReader(), collectionName);
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ assertEquals("The zkversion of the nodes must be same zkver:" + stat.getVersion() , stat.getVersion(),clusterState.getCollection(collectionName).getZNodeVersion() );
+ assertTrue("DocCllection#getStateFormat() must be > 1", cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getStateFormat() > 1);
+
+
+ // remove collection
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.DELETE.toString());
+ params.set("name", collectionName);
+ QueryRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+ if (client == null) {
+ client = createCloudClient(null);
+ }
+
+ client.request(request);
+
+ checkForMissingCollection(collectionName);
+ assertFalse("collection state should not exist externally", cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
+
+ }
+}
+
+
+
Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java Thu Jun 19 16:25:31 2014
@@ -50,9 +50,9 @@ public class SliceStateTest extends Solr
collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
ZkStateReader mockZkStateReader = ClusterStateTest.getMockZkStateReader(collectionStates.keySet());
- ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+ ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates, mockZkStateReader);
byte[] bytes = ZkStateReader.toJSON(clusterState);
- ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+ ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes, mockZkStateReader,null);
assertEquals("Default state not set to active", "active", loadedClusterState.getSlice("collection1", "shard1").getState());
}
Modified: lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/core/src/test/org/apache/solr/cloud/SliceStateUpdateTest.java Thu Jun 19 16:25:31 2014
@@ -30,6 +30,7 @@ import org.apache.solr.common.cloud.ZkNo
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -39,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.Map;
@Slow
@@ -181,8 +181,22 @@ public class SliceStateUpdateTest extend
assertEquals("shard1", slices.get("shard1").getName());
assertEquals("inactive", slices.get("shard1").getState());
- container1.getZkController().getOverseerElector().getContext().cancelElection();
- container2.getZkController().getOverseerElector().getContext().cancelElection();
+
+ cancelElection(container1);
+ cancelElection(container2);
+ }
+
+ private void cancelElection(CoreContainer cc) throws InterruptedException, KeeperException {
+ for(int i=0;i<10;) {
+ if(cc.getZkController().getOverseerElector().getContext().leaderSeqPath ==null){
+ Thread.sleep(20);
+ continue;
+ }else{
+ break;
+ }
+ }
+ cc.getZkController().getOverseerElector().getContext().cancelElection();
+
}
private void closeThread(OverseerThread updaterThread) {
Modified: lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml (original)
+++ lucene/dev/branches/solr-5473/solr/example/solr/collection1/conf/schema.xml Thu Jun 19 16:25:31 2014
@@ -1106,9 +1106,8 @@
<!-- Thai -->
<fieldType name="text_th" class="solr.TextField" positionIncrementGap="100">
<analyzer>
- <tokenizer class="solr.StandardTokenizerFactory"/>
+ <tokenizer class="solr.ThaiTokenizerFactory"/>
<filter class="solr.LowerCaseFilterFactory"/>
- <filter class="solr.ThaiWordFilterFactory"/>
<filter class="solr.StopFilterFactory" ignoreCase="true" words="lang/stopwords_th.txt" />
</analyzer>
</fieldType>
Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Thu Jun 19 16:25:31 2014
@@ -18,7 +18,9 @@ package org.apache.solr.client.solrj.imp
*/
import java.io.IOException;
+import java.net.ConnectException;
import java.net.MalformedURLException;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -30,13 +32,16 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ConnectTimeoutException;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
@@ -67,6 +72,8 @@ import org.apache.solr.common.util.Named
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* SolrJ client class to communicate with SolrCloud.
@@ -79,6 +86,8 @@ import org.apache.zookeeper.KeeperExcept
* with {@link #setIdField(String)}.
*/
public class CloudSolrServer extends SolrServer {
+ private static final Logger log = LoggerFactory.getLogger(CloudSolrServer.class);
+
private volatile ZkStateReader zkStateReader;
private String zkHost; // the zk server address
private int zkConnectTimeout = 10000;
@@ -87,6 +96,8 @@ public class CloudSolrServer extends Sol
private final LBHttpSolrServer lbServer;
private final boolean shutdownLBHttpSolrServer;
private HttpClient myClient;
+ //no of times collection state to be reloaded if stale state error is received
+ private static final int MAX_STALE_RETRIES = 5;
Random rand = new Random();
private final boolean updatesToLeaders;
@@ -95,6 +106,7 @@ public class CloudSolrServer extends Sol
.newCachedThreadPool(new SolrjNamedThreadFactory(
"CloudSolrServer ThreadPool"));
private String idField = "id";
+ public static final String STATE_VERSION = "_stateVer_";
private final Set<String> NON_ROUTABLE_PARAMS;
{
NON_ROUTABLE_PARAMS = new HashSet<>();
@@ -112,8 +124,36 @@ public class CloudSolrServer extends Sol
// NON_ROUTABLE_PARAMS.add(UpdateParams.ROLLBACK);
}
+ private volatile long timeToLive = 60* 1000L;
+
+
+ protected Map<String, ExpiringCachedDocCollection> collectionStateCache = new ConcurrentHashMap<String, ExpiringCachedDocCollection>(){
+ @Override
+ public ExpiringCachedDocCollection get(Object key) {
+ ExpiringCachedDocCollection val = super.get(key);
+ if(val == null) return null;
+ if(val.isExpired(timeToLive)) {
+ super.remove(key);
+ return null;
+ }
+ return val;
+ }
+
+ };
+ class ExpiringCachedDocCollection {
+ DocCollection cached;
+ long cachedAt;
+ ExpiringCachedDocCollection(DocCollection cached) {
+ this.cached = cached;
+ this.cachedAt = System.currentTimeMillis();
+ }
+
+ boolean isExpired(long timeToLive) {
+ return (System.currentTimeMillis() - cachedAt) > timeToLive;
+ }
+ }
/**
* @param zkHost The client endpoint of the zookeeper quorum containing the cloud state,
@@ -127,6 +167,8 @@ public class CloudSolrServer extends Sol
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = true;
shutdownLBHttpSolrServer = true;
+ setupStateVerParamOnQueryString(lbServer);
+
}
public CloudSolrServer(String zkHost, boolean updatesToLeaders)
@@ -138,6 +180,15 @@ public class CloudSolrServer extends Sol
this.lbServer.setParser(new BinaryResponseParser());
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = true;
+ setupStateVerParamOnQueryString(lbServer);
+ }
+
+ /**Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json
+ * @param seconds ttl value in seconds
+ */
+ public void setCollectionCacheTTl(int seconds){
+ assert seconds > 0;
+ timeToLive = seconds*1000L;
}
/**
@@ -150,6 +201,7 @@ public class CloudSolrServer extends Sol
this.lbServer = lbServer;
this.updatesToLeaders = true;
shutdownLBHttpSolrServer = false;
+ setupStateVerParamOnQueryString(lbServer);
}
/**
@@ -163,8 +215,24 @@ public class CloudSolrServer extends Sol
this.lbServer = lbServer;
this.updatesToLeaders = updatesToLeaders;
shutdownLBHttpSolrServer = false;
+ setupStateVerParamOnQueryString(lbServer);
+
}
+ /**
+ * Used internally to setup the _stateVer_ param to be sent in the query string of requests
+ * coming from this instance.
+ */
+ protected void setupStateVerParamOnQueryString(LBHttpSolrServer lbServer) {
+ // setup the stateVer param to be passed in the query string of every request
+ Set<String> queryStringParams = lbServer.getQueryParams();
+ if (queryStringParams == null) {
+ queryStringParams = new HashSet<String>(2);
+ lbServer.setQueryParams(queryStringParams);
+ }
+ queryStringParams.add("_stateVer_");
+ }
+
public ResponseParser getParser() {
return lbServer.getParser();
}
@@ -238,8 +306,7 @@ public class CloudSolrServer extends Sol
if (zkStateReader == null) {
ZkStateReader zk = null;
try {
- zk = new ZkStateReader(zkHost, zkClientTimeout,
- zkConnectTimeout);
+ zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk;
} catch (InterruptedException e) {
@@ -302,7 +369,7 @@ public class CloudSolrServer extends Sol
}
}
- DocCollection col = clusterState.getCollection(collection);
+ DocCollection col = getDocCollection(clusterState, collection);
DocRouter router = col.getRouter();
@@ -519,7 +586,146 @@ public class CloudSolrServer extends Sol
}
@Override
- public NamedList<Object> request(SolrRequest request)
+ public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
+ SolrParams reqParams = request.getParams();
+ String collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();
+ return requestWithRetryOnStaleState(request, 0, collection);
+ }
+
+ /**
+ * As this class doesn't watch external collections on the client side,
+ * there's a chance that the request will fail due to cached stale state,
+ * which means the state must be refreshed from ZK and retried.
+ */
+ protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)
+ throws SolrServerException, IOException {
+
+ connect(); // important to call this before you start working with the ZkStateReader
+
+ // build up a _stateVer_ param to pass to the server containing all of the
+ // external collection state versions involved in this request, which allows
+ // the server to notify us that our cached state for one or more of the external
+ // collections is stale and needs to be refreshed ... this code has no impact on internal collections
+ String stateVerParam = null;
+ List<DocCollection> requestedExternalCollections = null;
+ if (collection != null && !request.getPath().startsWith("/admin")) { // don't do _stateVer_ checking for admin requests
+ Set<String> requestedCollectionNames = getCollectionList(getZkStateReader().getClusterState(), collection);
+
+ StringBuilder stateVerParamBuilder = null;
+ for (String requestedCollection : requestedCollectionNames) {
+ // track the version of state we're using on the client side using the _stateVer_ param
+ DocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection);
+ int collVer = coll.getZNodeVersion();
+ if (coll.getStateFormat()>1) {
+ if(requestedExternalCollections == null) requestedExternalCollections = new ArrayList<>(requestedCollectionNames.size());
+ requestedExternalCollections.add(coll);
+
+ if (stateVerParamBuilder == null) {
+ stateVerParamBuilder = new StringBuilder();
+ } else {
+ stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
+ }
+
+ stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+ }
+ }
+
+ if (stateVerParamBuilder != null) {
+ stateVerParam = stateVerParamBuilder.toString();
+ }
+ }
+
+ if (request.getParams() instanceof ModifiableSolrParams) {
+ ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();
+ if (stateVerParam != null) {
+ params.set(STATE_VERSION, stateVerParam);
+ } else {
+ params.remove(STATE_VERSION);
+ }
+ } // else: ??? how to set this ???
+
+ NamedList<Object> resp = null;
+ try {
+ resp = sendRequest(request);
+ } catch (Exception exc) {
+
+ Throwable rootCause = SolrException.getRootCause(exc);
+ // don't do retry support for admin requests or if the request doesn't have a collection specified
+ if (collection == null || request.getPath().startsWith("/admin")) {
+ if (exc instanceof SolrServerException) {
+ throw (SolrServerException)exc;
+ } else if (exc instanceof IOException) {
+ throw (IOException)exc;
+ }else if (exc instanceof RuntimeException) {
+ throw (RuntimeException) exc;
+ }
+ else {
+ throw new SolrServerException(rootCause);
+ }
+ }
+
+ int errorCode = (rootCause instanceof SolrException) ?
+ ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
+
+ log.error("Request to collection {} failed due to ("+errorCode+
+ ") {}, retry? "+retryCount, collection, rootCause.toString());
+
+ boolean wasCommError =
+ (rootCause instanceof ConnectException ||
+ rootCause instanceof ConnectTimeoutException ||
+ rootCause instanceof NoHttpResponseException ||
+ rootCause instanceof SocketException);
+
+ boolean stateWasStale = false;
+ if (retryCount < MAX_STALE_RETRIES &&
+ !requestedExternalCollections.isEmpty() &&
+ SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
+ {
+ // cached state for one or more external collections was stale
+ // re-issue request using updated state
+ stateWasStale = true;
+
+ // just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrence
+ for (DocCollection ext : requestedExternalCollections) {
+ collectionStateCache.remove(ext.getName());
+ }
+ }
+
+ // if we experienced a communication error, it's worth checking the state
+ // with ZK just to make sure the node we're trying to hit is still part of the collection
+ if (retryCount < MAX_STALE_RETRIES && !stateWasStale && !requestedExternalCollections.isEmpty() && wasCommError) {
+ for (DocCollection ext : requestedExternalCollections) {
+ DocCollection latestStateFromZk = getZkStateReader().getCollection(ext.getName());
+ if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {
+ // looks like we couldn't reach the server because the state was stale == retry
+ stateWasStale = true;
+ // we just pulled state from ZK, so update the cache so that the retry uses it
+ collectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));
+ }
+ }
+ }
+
+ requestedExternalCollections.clear(); // done with this
+
+ // if the state was stale, then we retry the request once with new state pulled from Zk
+ if (stateWasStale) {
+ log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server.");
+ resp = requestWithRetryOnStaleState(request, retryCount+1, collection);
+ } else {
+ if (exc instanceof SolrServerException) {
+ throw (SolrServerException)exc;
+ } else if (exc instanceof IOException) {
+ throw (IOException)exc;
+ } else {
+ throw new SolrServerException(rootCause);
+ }
+ }
+ }
+
+ return resp;
+ }
+
+ protected NamedList<Object> sendRequest(SolrRequest request)
throws SolrServerException, IOException {
connect();
@@ -579,7 +785,7 @@ public class CloudSolrServer extends Sol
// add it to the Map of slices.
Map<String,Slice> slices = new HashMap<>();
for (String collectionName : collectionsList) {
- DocCollection col = clusterState.getCollection(collectionName);
+ DocCollection col = getDocCollection(clusterState, collectionName);
Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);
ClientUtils.addSlices(slices, collectionName, routeSlices, true);
}
@@ -671,14 +877,17 @@ public class CloudSolrServer extends Sol
Aliases aliases = zkStateReader.getAliases();
String alias = aliases.getCollectionAlias(collectionName);
if (alias != null) {
- List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
+ List<String> aliasList = StrUtils.splitSmart(alias, ",", true);
collectionsList.addAll(aliasList);
continue;
}
-
- throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+
+ DocCollection docCollection = getDocCollection(clusterState, collection);
+ if (docCollection == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection not found: " + collectionName);
+ }
}
-
+
collectionsList.add(collectionName);
}
return collectionsList;
@@ -715,6 +924,28 @@ public class CloudSolrServer extends Sol
return updatesToLeaders;
}
+ protected DocCollection getDocCollection(ClusterState clusterState, String collection) throws SolrException {
+ ExpiringCachedDocCollection cachedState = collectionStateCache != null ? collectionStateCache.get(collection) : null;
+ if (cachedState != null && cachedState.cached != null) {
+ return cachedState.cached;
+ }
+
+ DocCollection col = clusterState.getCollectionOrNull(collection);
+ if(col == null ) return null;
+ collectionStateCache.put(collection, new ExpiringCachedDocCollection(col));
+ return col;
+ }
+
+ /**
+ * Extension point to allow sub-classes to override the ZkStateReader this class uses internally.
+ */
+ protected ZkStateReader createZkStateReader(String zkHost, int zkClientTimeout, int zkConnectTimeout)
+ throws InterruptedException, TimeoutException, IOException, KeeperException {
+ ZkStateReader zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
+ zk.createClusterStateWatchersAndUpdate();
+ return zk;
+ }
+
/**
* Useful for determining the minimum achieved replication factor across
* all shards involved in processing an update request, typically useful
Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Thu Jun 19 16:25:31 2014
@@ -46,36 +46,25 @@ public class ClusterState implements JSO
private final Map<String, DocCollection> collectionStates; // Map<collectionName, Map<sliceName,Slice>>
private Set<String> liveNodes;
+ private final ZkStateReader stateReader;
-
- /**
- * Use this constr when ClusterState is meant for publication.
- *
- * hashCode and equals will only depend on liveNodes and not clusterStateVersion.
- */
- @Deprecated
- public ClusterState(Set<String> liveNodes,
- Map<String, DocCollection> collectionStates) {
- this(null, liveNodes, collectionStates);
- }
-
-
-
/**
* Use this constr when ClusterState is meant for consumption.
*/
public ClusterState(Integer zkClusterStateVersion, Set<String> liveNodes,
- Map<String, DocCollection> collectionStates) {
+ Map<String, DocCollection> collectionStates, ZkStateReader stateReader) {
+ assert stateReader != null;
this.zkClusterStateVersion = zkClusterStateVersion;
this.liveNodes = new HashSet<>(liveNodes.size());
this.liveNodes.addAll(liveNodes);
this.collectionStates = new LinkedHashMap<>(collectionStates.size());
this.collectionStates.putAll(collectionStates);
+ this.stateReader = stateReader;
}
public ClusterState copyWith(Map<String,DocCollection> modified){
- ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates);
+ ClusterState result = new ClusterState(zkClusterStateVersion, liveNodes,collectionStates,stateReader);
for (Entry<String, DocCollection> e : modified.entrySet()) {
DocCollection c = e.getValue();
if(c == null) {
@@ -98,17 +87,10 @@ public class ClusterState implements JSO
if (slice == null) return null;
return slice.getLeader();
}
- private Replica getReplica(DocCollection coll, String replicaName) {
- if (coll == null) return null;
- for (Slice slice : coll.getSlices()) {
- Replica replica = slice.getReplica(replicaName);
- if (replica != null) return replica;
- }
- return null;
- }
public boolean hasCollection(String coll) {
- return collectionStates.containsKey(coll) ;
+ if (collectionStates.containsKey(coll)) return true;
+ return stateReader.getAllCollections().contains(coll);
}
/**
@@ -116,8 +98,9 @@ public class ClusterState implements JSO
* If the slice is known, do not use this method.
* coreNodeName is the same as replicaName
*/
- public Replica getReplica(final String collection, final String coreNodeName) {
- return getReplica(collectionStates.get(collection), coreNodeName);
+ public Replica getReplica(final String collection, final String coreNodeName, boolean cachedOnly) {
+ DocCollection coll = stateReader.getCollection(collection,cachedOnly);
+ return coll == null? null: coll.getReplica(coreNodeName);
}
/**
@@ -153,6 +136,35 @@ public class ClusterState implements JSO
return coll.getActiveSlices();
}
+ /**
+ * Get the {@code DocCollection} object if available. This method will
+ * never hit ZooKeeper and attempt to fetch collection from locally available
+ * state only.
+ *
+ * @param collection the name of the collection
+ * @return the {@link org.apache.solr.common.cloud.DocCollection} or null if not found
+ */
+ private DocCollection getCachedCollection(String collection) {
+ DocCollection c = collectionStates.get(collection);
+ if (c != null) return c;
+ if (!stateReader.getAllCollections().contains(collection)) return null;
+ return stateReader.getCollection(collection, true); // return from cache
+ }
+
+ /** expert internal use only
+ * Gets the replica from caches by the core name (assuming the slice is unknown) or null if replica is not found.
+ * If the slice is known, do not use this method.
+ * coreNodeName is the same as replicaName
+ */
+ public Replica getCachedReplica(String collectionName, String coreNodeName) {
+ DocCollection c = getCachedCollection(collectionName);
+ if (c == null) return null;
+ for (Slice slice : c.getSlices()) {
+ Replica replica = slice.getReplica(coreNodeName);
+ if (replica != null) return replica;
+ }
+ return null;
+ }
/**
* Get the named DocCollection object, or throw an exception if it doesn't exist.
@@ -163,23 +175,26 @@ public class ClusterState implements JSO
return coll;
}
-
public DocCollection getCollectionOrNull(String coll) {
- return collectionStates.get(coll);
+ DocCollection c = collectionStates.get(coll);
+ if (c != null) return c;
+ if (!stateReader.getAllCollections().contains(coll)) return null;
+ return stateReader.getCollection(coll);
}
/**
* Get collection names.
*/
public Set<String> getCollections() {
- return collectionStates.keySet();
+ return stateReader.getAllCollections();
}
/**
+ * For internal use only
* @return Map<collectionName, Map<sliceName,Slice>>
*/
- public Map<String, DocCollection> getCollectionStates() {
- return Collections.unmodifiableMap(collectionStates);
+ Map<String, DocCollection> getCollectionStates() {
+ return collectionStates;
}
/**
@@ -238,7 +253,7 @@ public class ClusterState implements JSO
Stat stat = new Stat();
byte[] state = zkClient.getData(ZkStateReader.CLUSTER_STATE,
null, stat, true);
- return load(stat.getVersion(), state, liveNodes);
+ return load(stat.getVersion(), state, liveNodes, stateReader, ZkStateReader.CLUSTER_STATE);
}
@@ -250,25 +265,34 @@ public class ClusterState implements JSO
* @param version zk version of the clusterstate.json file (bytes)
* @param bytes clusterstate.json as a byte array
* @param liveNodes list of live nodes
+ * @param stateReader The ZkStateReader for this clusterstate
+ * @param znode the znode from which this data is read from
* @return the ClusterState
*/
- public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
+ public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, ZkStateReader stateReader, String znode) {
// System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
if (bytes == null || bytes.length == 0) {
- return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
+ return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap(),stateReader);
}
Map<String, Object> stateMap = (Map<String, Object>) ZkStateReader.fromJSON(bytes);
Map<String,DocCollection> collections = new LinkedHashMap<>(stateMap.size());
for (Entry<String, Object> entry : stateMap.entrySet()) {
String collectionName = entry.getKey();
- DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
+ DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
collections.put(collectionName, coll);
}
// System.out.println("######## ClusterState.load result:" + collections);
- return new ClusterState( version, liveNodes, collections);
+ return new ClusterState( version, liveNodes, collections,stateReader);
}
+ /**
+ * @deprecated use {@link #load(Integer, byte[], Set, ZkStateReader, String)}
+ */
+ @Deprecated
+ public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes){
+ return load(version == null ? -1: version, bytes, liveNodes,null,null);
+ }
public static Aliases load(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
@@ -279,7 +303,7 @@ public class ClusterState implements JSO
return new Aliases(aliasMap);
}
- private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version) {
+ private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
Map<String,Object> props;
Map<String,Slice> slices;
@@ -306,7 +330,7 @@ public class ClusterState implements JSO
router = DocRouter.getDocRouter((String) routerProps.get("name"));
}
- return new DocCollection(name, slices, props, router, version);
+ return new DocCollection(name, slices, props, router, version,znode);
}
private static Map<String,Slice> makeSlices(Map<String,Object> genericSlices) {
@@ -334,7 +358,7 @@ public class ClusterState implements JSO
*
* @return null if ClusterState was created for publication, not consumption
*/
- public Integer getZkClusterStateVersion() {
+ public Integer getZnodeVersion() {
return zkClusterStateVersion;
}
@@ -364,6 +388,9 @@ public class ClusterState implements JSO
}
+ public ZkStateReader getStateReader(){
+ return stateReader;
+ }
/**
* Internal API used only by ZkStateReader
@@ -372,9 +399,5 @@ public class ClusterState implements JSO
this.liveNodes = liveNodes;
}
- public DocCollection getCommonCollection(String name){
- return collectionStates.get(name);
-
- }
}
Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java Thu Jun 19 16:25:31 2014
@@ -21,7 +21,6 @@ import org.noggit.JSONUtil;
import org.noggit.JSONWriter;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -33,15 +32,17 @@ import java.util.Map;
public class DocCollection extends ZkNodeProps {
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
- private int version;
+ public static final String STATE_FORMAT = "stateFormat";
+ private int znodeVersion;
private final String name;
private final Map<String, Slice> slices;
private final Map<String, Slice> activeSlices;
private final DocRouter router;
+ private final String znode;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
- this(name, slices, props, router, -1);
+ this(name, slices, props, router, -1, ZkStateReader.CLUSTER_STATE);
}
/**
@@ -49,9 +50,9 @@ public class DocCollection extends ZkNod
* @param slices The logical shards of the collection. This is used directly and a copy is not made.
* @param props The properties of the slice. This is used directly and a copy is not made.
*/
- public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
+ public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
super( props==null ? props = new HashMap<String,Object>() : props);
- this.version = zkVersion;
+ this.znodeVersion = zkVersion;
this.name = name;
this.slices = slices;
@@ -65,10 +66,14 @@ public class DocCollection extends ZkNod
this.activeSlices.put(slice.getKey(), slice.getValue());
}
this.router = router;
-
+ this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
assert name != null && slices != null;
}
+ public DocCollection copyWith(Map<String, Slice> slices){
+ return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
+ }
+
/**
* Return collection name.
@@ -110,9 +115,16 @@ public class DocCollection extends ZkNod
return activeSlices;
}
- public int getVersion(){
- return version;
+ public int getZNodeVersion(){
+ return znodeVersion;
+ }
+ public int getStateFormat(){
+ return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1:2;
+ }
+
+ public String getZNode(){
+ return znode;
}
@@ -132,4 +144,12 @@ public class DocCollection extends ZkNod
all.put(SHARDS, slices);
jsonWriter.write(all);
}
+
+ public Replica getReplica(String coreNodeName) {
+ for (Slice slice : slices.values()) {
+ Replica replica = slice.getReplica(coreNodeName);
+ if (replica != null) return replica;
+ }
+ return null;
+ }
}
Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Thu Jun 19 16:25:31 2014
@@ -68,6 +68,10 @@ public class SolrZkClient {
private ConnectionManager connManager;
+ static int instcount = 0;
+
+ public int inst = instcount++;
+
private volatile SolrZooKeeper keeper;
private ZkCmdExecutor zkCmdExecutor;
@@ -75,6 +79,10 @@ public class SolrZkClient {
private volatile boolean isClosed = false;
private ZkClientConnectionStrategy zkClientConnectionStrategy;
private int zkClientTimeout;
+
+// {
+// log.warn("created inst : "+inst, new Exception("debugcreate"));
+// }
public int getZkClientTimeout() {
return zkClientTimeout;
@@ -564,6 +572,7 @@ public class SolrZkClient {
}
public void close() {
+// log.warn("closed inst :"+inst, new Exception("leakdebug"));
if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true;
try {
Modified: lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Thu Jun 19 16:25:31 2014
@@ -98,10 +98,16 @@ public class ZkStateReader {
public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders";
+ private final Set<String> watchedCollections = new HashSet<String>();
+ /**These are collections which are actively watched by this instance .
+ *
+ */
+ private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
+ private Set<String> allCollections = Collections.emptySet();
+
-
//
// convenience methods... should these go somewhere else?
//
@@ -162,7 +168,8 @@ public class ZkStateReader {
log.info("path={} {}={} specified config exists in ZooKeeper",
new Object[] {path, CONFIGNAME_PROP, configName});
}
-
+ } else {
+ throw new ZooKeeperException(ErrorCode.INVALID_STATE, "No config data found at path: " + path);
}
}
catch (KeeperException e) {
@@ -251,22 +258,21 @@ public class ZkStateReader {
return aliases;
}
- /*public Boolean checkValid(String coll, int version){
+ public Boolean checkValid(String coll, int version){
DocCollection collection = clusterState.getCollectionOrNull(coll);
if(collection ==null) return null;
- if(collection.getVersion() < version){
- log.info("server older than client {}<{}",collection.getVersion(),version);
- DocCollection nu = getExternCollectionFresh(this, coll);
- if(nu.getVersion()> collection.getVersion()){
- updateExternCollection(nu);
+ if(collection.getZNodeVersion() < version){
+ log.info("server older than client {}<{}",collection.getZNodeVersion(),version);
+ DocCollection nu = getCollectionLive(this, coll);
+ if(nu.getZNodeVersion()> collection.getZNodeVersion()){
+ updateWatchedCollection(nu);
collection = nu;
}
}
- if(collection.getVersion() == version) return Boolean.TRUE;
- log.info("wrong version from client {}!={} ",version, collection.getVersion());
+ if(collection.getZNodeVersion() == version) return Boolean.TRUE;
+ log.info("wrong version from client {}!={} ",version, collection.getZNodeVersion());
return Boolean.FALSE;
-
- }*/
+ }
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
InterruptedException {
@@ -299,10 +305,11 @@ public class ZkStateReader {
byte[] data = zkClient.getData(CLUSTER_STATE, thisWatch, stat ,
true);
Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
- ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln);
+ ClusterState clusterState = ClusterState.load(stat.getVersion(), data, ln,ZkStateReader.this, null);
// update volatile
ZkStateReader.this.clusterState = clusterState;
+ updateCollectionNames();
// HashSet<String> all = new HashSet<>(colls);;
// all.addAll(clusterState.getAllInternalCollections());
// all.remove(null);
@@ -377,6 +384,7 @@ public class ZkStateReader {
liveNodeSet.addAll(liveNodes);
ClusterState clusterState = ClusterState.load(zkClient, liveNodeSet, ZkStateReader.this);
this.clusterState = clusterState;
+ updateCollectionNames();
zkClient.exists(ALIASES,
new Watcher() {
@@ -422,6 +430,40 @@ public class ZkStateReader {
}, true);
}
updateAliases();
+ //on reconnect of SolrZkClient re-add watchers for the watched external collections
+ synchronized (this) {
+ for (String watchedCollection : watchedCollections) {
+ addZkWatch(watchedCollection);
+ }
+ }
+ }
+
+ public void updateCollectionNames() throws KeeperException, InterruptedException {
+ Set<String> colls = getExternColls();
+ colls.addAll(clusterState.getCollectionStates().keySet());
+ allCollections = Collections.unmodifiableSet(colls);
+ }
+
+ private Set<String> getExternColls() throws KeeperException, InterruptedException {
+ List<String> children = null;
+ try {
+ children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("Error fetching collection names");
+
+ return new HashSet<>();
+ }
+ if (children == null || children.isEmpty()) return new HashSet<>();
+ HashSet<String> result = new HashSet<>(children.size());
+
+ for (String c : children) {
+ try {
+ if (zkClient.exists(getCollectionPath(c), true)) result.add(c);
+ } catch (Exception e) {
+ log.warn("Error checking external collections", e);
+ }
+ }
+ return result;
}
@@ -449,6 +491,7 @@ public class ZkStateReader {
clusterState.setLiveNodes(liveNodesSet);
}
this.clusterState = clusterState;
+ updateCollectionNames();
}
} else {
@@ -507,9 +550,13 @@ public class ZkStateReader {
}
}, SOLRCLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
-
+ synchronized (this) {
+ for (String watchedCollection : watchedCollections) {
+ watchedCollectionStates.put(watchedCollection, getCollectionLive(this, watchedCollection));
+ }
+ }
}
-
+
/**
* @return information about the cluster from ZooKeeper
*/
@@ -632,6 +679,9 @@ public class ZkStateReader {
public SolrZkClient getZkClient() {
return zkClient;
}
+ public Set<String> getAllCollections(){
+ return allCollections;
+ }
public void updateAliases() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData(ALIASES, null, null, true);
@@ -678,4 +728,159 @@ public class ZkStateReader {
}
}
+ public void updateWatchedCollection(DocCollection c) {
+ if(watchedCollections.contains(c.getName())){
+ watchedCollectionStates.put(c.getName(), c);
+ log.info("Updated DocCollection "+c.getName()+" to: ");
+ }
+ }
+
+ /**
+ * <b>Advance usage</b>
+ * This method can be used to fetch a collection object and control whether it hits
+ * the cache only or if information can be looked up from ZooKeeper.
+ *
+ * @param coll the collection name
+ * @param cachedCopyOnly whether to fetch data from cache only or if hitting Zookeeper is acceptable
+ * @return the {@link org.apache.solr.common.cloud.DocCollection}
+ */
+ public DocCollection getCollection(String coll, boolean cachedCopyOnly) {
+ if(clusterState.getCollectionStates().get(coll) != null) {
+ //this collection resides in clusterstate.json. So it's always up-to-date
+ return clusterState.getCollectionStates().get(coll);
+ }
+ if (watchedCollections.contains(coll) || cachedCopyOnly) {
+ DocCollection c = watchedCollectionStates.get(coll);
+ if (c != null || cachedCopyOnly) return c;
+ }
+ return getCollectionLive(this, coll);
+ }
+ // this is only set by Overseer not to be set by others and only set inside the Overseer node. If Overseer has
+ // unfinished external collections which are yet to be persisted to ZK
+ // this map is populated and this class can use that information
+ public Map ephemeralCollectionData;
+
+ public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
+ String collectionPath = getCollectionPath(coll);
+ if(zkStateReader.ephemeralCollectionData !=null ){
+ ClusterState cs = (ClusterState) zkStateReader.ephemeralCollectionData.get(collectionPath);
+ if(cs !=null) {
+ return cs.getCollectionStates().get(coll);
+ }
+ }
+ try {
+ if (!zkStateReader.getZkClient().exists(collectionPath, true)) return null;
+ Stat stat = new Stat();
+ byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
+ ClusterState state = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), zkStateReader, collectionPath);
+ return state.getCollectionStates().get(coll);
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("No node available : " + collectionPath, e);
+ return null;
+ } catch (KeeperException e) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK:" + coll, e);
+ }
+ }
+
+ public DocCollection getCollection(String coll) {
+ return getCollection(coll, false);
+ }
+
+ public static String getCollectionPath(String coll) {
+ return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
+ }
+
+ public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
+ synchronized (this){
+ if(watchedCollections.contains(coll)) return;
+ else {
+ watchedCollections.add(coll);
+ }
+ addZkWatch(coll);
+ }
+
+ }
+
+ private void addZkWatch(final String coll) throws KeeperException, InterruptedException {
+ log.info("addZkWatch {}", coll);
+ final String fullpath = getCollectionPath(coll);
+ synchronized (getUpdateLock()){
+
+ cmdExecutor.ensureExists(fullpath, zkClient);
+ log.info("Updating collection state at {} from ZooKeeper... ",fullpath);
+
+ Watcher watcher = new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ // session events are not change events,
+ // and do not remove the watcher
+ if (EventType.None.equals(event.getType())) {
+ return;
+ }
+ log.info("A cluster state change: {}, has occurred - updating... ", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
+ try {
+
+ // delayed approach
+ // ZkStateReader.this.updateClusterState(false, false);
+ synchronized (ZkStateReader.this.getUpdateLock()) {
+ if(!watchedCollections.contains(coll)) {
+ log.info("Unwatched collection {}",coll);
+ return;
+ }
+ // remake watch
+ final Watcher thisWatch = this;
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
+
+ if(data == null || data.length ==0){
+ log.warn("No value set for collection state : {}", coll);
+ return;
+
+ }
+ ClusterState clusterState = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(),ZkStateReader.this,fullpath);
+ // update volatile
+
+ DocCollection newState = clusterState.getCollectionStates().get(coll);
+ watchedCollectionStates.put(coll, newState);
+ log.info("Updating data for {} to ver {} ", coll , newState.getZNodeVersion());
+
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("Unwatched collection :"+coll , e);
+ throw new ZooKeeperException(ErrorCode.SERVER_ERROR,
+ "", e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Unwatched collection :"+coll , e);
+ return;
+ }
+ }
+
+ };
+ zkClient.exists(fullpath, watcher, true);
+ }
+
+ watchedCollectionStates.put(coll, getCollectionLive(this, coll));
+ }
+
+ /**This is not a public API. Only used by ZkController */
+ public void removeZKWatch(final String coll){
+ synchronized (this){
+ watchedCollections.remove(coll);
+ }
+ }
+
+
+
+
}
Modified: lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java (original)
+++ lucene/dev/branches/solr-5473/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrServerTest.java Thu Jun 19 16:25:31 2014
@@ -20,7 +20,6 @@ package org.apache.solr.client.solrj.imp
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
import org.apache.solr.cloud.AbstractZkTestCase;
-import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -51,7 +49,6 @@ import org.apache.solr.common.cloud.DocC
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -62,7 +59,6 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,6 +121,7 @@ public class CloudSolrServerTest extends
@Override
public void doTest() throws Exception {
allTests();
+ stateVersionParamTest();
}
private void allTests() throws Exception {
@@ -346,7 +343,77 @@ public class CloudSolrServerTest extends
SolrInputDocument doc = getDoc(fields);
indexDoc(doc);
}
-
+
+ private void stateVersionParamTest() throws Exception {
+ CloudSolrServer client = createCloudClient(null);
+ try {
+ String collectionName = "checkStateVerCol";
+ createCollection(collectionName, client, 2, 2);
+ waitForRecoveriesToFinish(collectionName, false);
+ DocCollection coll = client.getZkStateReader().getClusterState().getCollection(collectionName);
+ Replica r = coll.getSlices().iterator().next().getReplicas().iterator().next();
+
+ HttpSolrServer httpSolrServer = new HttpSolrServer(r.getStr(ZkStateReader.BASE_URL_PROP) + "/"+collectionName);
+
+
+ SolrQuery q = new SolrQuery().setQuery("*:*");
+
+ log.info("should work query, result {}", httpSolrServer.query(q));
+ //no problem
+ q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
+ log.info("2nd query , result {}", httpSolrServer.query(q));
+ //no error yet good
+
+ q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+ (coll.getZNodeVersion() -1)); //an older version expect error
+
+ HttpSolrServer.RemoteSolrException sse = null;
+ try {
+ httpSolrServer.query(q);
+ log.info("expected query error");
+ } catch (HttpSolrServer.RemoteSolrException e) {
+ sse = e;
+ }
+ httpSolrServer.shutdown();
+ assertNotNull(sse);
+ assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
+
+ //now send the request to another node that does n ot serve the collection
+
+ Set<String> allNodesOfColl = new HashSet<>();
+ for (Slice slice : coll.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ allNodesOfColl.add(replica.getStr(ZkStateReader.BASE_URL_PROP));
+ }
+ }
+ String theNode = null;
+ for (String s : client.getZkStateReader().getClusterState().getLiveNodes()) {
+ String n = client.getZkStateReader().getBaseUrlForNodeName(s);
+ if(!allNodesOfColl.contains(s)){
+ theNode = n;
+ break;
+ }
+ }
+ log.info("thenode which does not serve this collection{} ",theNode);
+ assertNotNull(theNode);
+ httpSolrServer = new HttpSolrServer(theNode + "/"+collectionName);
+
+ q.setParam(CloudSolrServer.STATE_VERSION, collectionName+":"+coll.getZNodeVersion());
+
+ try {
+ httpSolrServer.query(q);
+ log.info("error was expected");
+ } catch (HttpSolrServer.RemoteSolrException e) {
+ sse = e;
+ }
+ httpSolrServer.shutdown();
+ assertNotNull(sse);
+ assertEquals(" Error code should be ", sse.code() , SolrException.ErrorCode.INVALID_STATE.code);
+ } finally {
+ client.shutdown();
+ }
+
+ }
+
public void testShutdown() throws MalformedURLException {
CloudSolrServer server = new CloudSolrServer("[ff01::114]:33332");
try {
Modified: lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java (original)
+++ lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/analysis/MockTokenFilterFactory.java Thu Jun 19 16:25:31 2014
@@ -62,4 +62,4 @@ public class MockTokenFilterFactory exte
public MockTokenFilter create(TokenStream stream) {
return new MockTokenFilter(stream, filter);
}
-}
\ No newline at end of file
+}
Modified: lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1603938&r1=1603937&r2=1603938&view=diff
==============================================================================
--- lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/branches/solr-5473/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Thu Jun 19 16:25:31 2014
@@ -338,6 +338,19 @@ public abstract class AbstractFullDistri
return createJettys(numJettys, false);
}
+ protected int defaultStateFormat = 1 + random().nextInt(2);
+
+ protected int getStateFormat() {
+ String stateFormat = System.getProperty("tests.solr.stateFormat", null);
+ if (stateFormat != null) {
+ if ("2".equals(stateFormat)) {
+ return defaultStateFormat = 2;
+ } else if ("1".equals(stateFormat)) {
+ return defaultStateFormat = 1;
+ }
+ }
+ return defaultStateFormat; // random
+ }
/**
* @param checkCreatedVsState
@@ -350,6 +363,17 @@ public abstract class AbstractFullDistri
List<SolrServer> clients = new ArrayList<>();
StringBuilder sb = new StringBuilder();
+ if(getStateFormat() == 2) {
+ log.info("Creating collection1 with stateFormat=2");
+ SolrZkClient zkClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT);
+ Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(ZkNodeProps.makeMap(
+ Overseer.QUEUE_OPERATION, OverseerCollectionProcessor.CREATECOLLECTION,
+ "name", DEFAULT_COLLECTION,
+ "numShards", String.valueOf(sliceCount),
+ DocCollection.STATE_FORMAT, getStateFormat())));
+ zkClient.close();
+ }
+
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet();
@@ -1489,6 +1513,10 @@ public abstract class AbstractFullDistri
collectionInfos.put(collectionName, list);
}
params.set("name", collectionName);
+ if (getStateFormat() == 2) {
+ log.info("Creating collection with stateFormat=2: " + collectionName);
+ params.set(DocCollection.STATE_FORMAT, "2");
+ }
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");