You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/10 13:45:23 UTC
svn commit: r1695026 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/cloud/overseer/
solr/core/src/java/org/apache/solr/handler/admin/
solr/core/src/test/org/apache/s...
Author: shalin
Date: Mon Aug 10 11:45:22 2015
New Revision: 1695026
URL: http://svn.apache.org/r1695026
Log:
SOLR-5756: A utility Collection API to move a collection from stateFormat=1 to stateFormat=2
SOLR-7870: Write a test which asserts that requests to stateFormat=2 collection succeed on a node even after all local replicas of that collection have been removed
Added:
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
- copied unchanged from r1694692, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Mon Aug 10 11:45:22 2015
@@ -31,6 +31,13 @@ Detailed Change List
New Features
----------------------
+* SOLR-5756: A utility Collection API to move a collection from shared clusterstate.json (stateFormat=1,
+ default until 4.x) to the per-collection state.json stored in ZooKeeper (stateFormat=2,
+ default since 5.0) seamlessly without any application down-time.
+ Example:
+ http://localhost:8983/solr/admin/collections?action=MIGRATESTATEFORMAT&collection=<collection_name>
+ (Noble Paul, Scott Blum, shalin)
+
* SOLR-7219: filterCache access added to the solr query syntax.
Example: description:HDTV OR filter(+promotion:tv +promotion_date:[NOW/DAY TO NOW/DAY+7DAY])
(yonik)
@@ -50,6 +57,10 @@ Other Changes
* SOLR-7831: Start Scripts: Allow a configurable stack size [-Xss] (Steve Davids via Mark Miller)
+* SOLR-7870: Write a test which asserts that requests to stateFormat=2 collection succeed on a node
+ even after all local replicas of that collection have been removed.
+ (Scott Blum via shalin)
+
================== 5.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Mon Aug 10 11:45:22 2015
@@ -359,6 +359,8 @@ public class Overseer implements Closeab
case MODIFYCOLLECTION:
CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties());
return new CollectionMutator(reader).modifyCollection(clusterState,message);
+ case MIGRATESTATEFORMAT:
+ return new ClusterStateMutator(reader).migrateStateFormat(clusterState, message);
default:
throw new RuntimeException("unknown operation:" + operation
+ " contents:" + message.getProperties());
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Mon Aug 10 11:45:22 2015
@@ -90,7 +90,6 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
@@ -113,10 +112,12 @@ import static org.apache.solr.common.par
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.util.StrUtils.formatString;
+import static org.apache.solr.common.util.Utils.makeMap;
public class OverseerCollectionMessageHandler implements OverseerMessageHandler {
@@ -273,6 +274,9 @@ public class OverseerCollectionMessageHa
case MODIFYCOLLECTION:
overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
break;
+ case MIGRATESTATEFORMAT:
+ migrateStateFormat(message, results);
+ break;
default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
@@ -807,6 +811,36 @@ public class OverseerCollectionMessageHa
}
}
+ private void migrateStateFormat(ZkNodeProps message, NamedList results)
+ throws KeeperException, InterruptedException {
+ final String collectionName = message.getStr(COLLECTION_PROP);
+
+ // wait for a while until the state format changes
+ long now = System.nanoTime();
+ long timeout = now + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+ boolean firstLoop = true;
+ while (System.nanoTime() < timeout) {
+ DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+ if (collection == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
+ }
+ if (collection.getStateFormat() == 2) {
+ // Done.
+ results.add("success", new SimpleOrderedMap<>());
+ return;
+ }
+
+ if (firstLoop) {
+ // Actually queue the migration command.
+ firstLoop = false;
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+ }
+ Thread.sleep(100);
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
+ }
+
private void createAlias(Aliases aliases, ZkNodeProps message) {
String aliasName = message.getStr(NAME);
String collections = message.getStr("collections");
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Mon Aug 10 11:45:22 2015
@@ -17,8 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import static org.apache.solr.common.cloud.ZkStateReader.*;
-
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
@@ -46,6 +44,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
@@ -98,7 +97,13 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Strings;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
/**
* Handle ZooKeeper interactions.
@@ -1524,8 +1529,8 @@ public final class ZkController {
publish(cd, Replica.State.DOWN, false, true);
DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
- if(collection !=null && collection.getStateFormat()>1 ){
- log.info("Registering watch for external collection {}",cd.getCloudDescriptor().getCollectionName());
+ if (collection != null) {
+ log.info("Registering watch for external collection {}", cd.getCloudDescriptor().getCollectionName());
zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
}
} catch (KeeperException e) {
@@ -2359,7 +2364,7 @@ public final class ZkController {
if (Event.EventType.None.equals(event.getType())) {
log.info("A node got unwatched for {}", zkDir);
} else {
- if(resetWatcher) setConfWatcher(zkDir,this,stat);
+ if(resetWatcher) setConfWatcher(zkDir, this, stat);
}
}
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java Mon Aug 10 11:45:22 2015
@@ -184,5 +184,16 @@ public class ClusterStateMutator {
}
return null;
}
+
+ public ZkWriteCommand migrateStateFormat(ClusterState clusterState, ZkNodeProps message) {
+ final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
+ if (!CollectionMutator.checkKeyExistence(message, ZkStateReader.COLLECTION_PROP)) return ZkStateWriter.NO_OP;
+ DocCollection coll = clusterState.getCollectionOrNull(collection);
+ if (coll == null || coll.getStateFormat() == 2) return ZkStateWriter.NO_OP;
+
+ return new ZkWriteCommand(coll.getName(),
+ new DocCollection(coll.getName(), coll.getSlicesMap(), coll.getProperties(), coll.getRouter(), 0,
+ ZkStateReader.getCollectionPath(collection)));
+ }
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Mon Aug 10 11:45:22 2015
@@ -700,6 +700,13 @@ public class CollectionsHandler extends
verifyRuleParams(h.coreContainer, m);
return m;
}
+ },
+ MIGRATESTATEFORMAT_OP(MIGRATESTATEFORMAT) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+ throws Exception {
+ return req.getParams().required().getAll(null, COLLECTION_PROP);
+ }
};
CollectionAction action;
long timeOut;
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java Mon Aug 10 11:45:22 2015
@@ -18,12 +18,21 @@ package org.apache.solr.cloud;
*/
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
import com.google.common.collect.Lists;
+import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
@@ -36,13 +45,6 @@ import org.apache.solr.common.util.Named
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
public class TestCollectionAPI extends ReplicaPropertiesBase {
@@ -77,6 +79,7 @@ public class TestCollectionAPI extends R
clusterStatusRolesTest();
replicaPropTest();
clusterStatusZNodeVersion();
+ testClusterStateMigration();
}
private void clusterStatusWithCollectionAndShard() throws IOException, SolrServerException {
@@ -591,6 +594,41 @@ public class TestCollectionAPI extends R
}
}
+ private void testClusterStateMigration() throws Exception {
+ try (CloudSolrClient client = createCloudClient(null)) {
+ client.connect();
+
+ new CollectionAdminRequest.Create()
+ .setCollectionName("testClusterStateMigration")
+ .setNumShards(1)
+ .setReplicationFactor(1)
+ .setConfigName("conf1")
+ .setStateFormat(1)
+ .process(client);
+
+ waitForRecoveriesToFinish("testClusterStateMigration", true);
+
+ assertEquals(1, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
+
+ for (int i = 0; i < 10; i++) {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "id_" + i);
+ client.add("testClusterStateMigration", doc);
+ }
+ client.commit("testClusterStateMigration");
+
+ new CollectionAdminRequest.MigrateClusterState()
+ .setCollectionName("testClusterStateMigration")
+ .process(client);
+
+ client.getZkStateReader().updateClusterState();
+
+ assertEquals(2, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
+
+ QueryResponse response = client.query("testClusterStateMigration", new SolrQuery("*:*"));
+ assertEquals(10, response.getResults().getNumFound());
+ }
+ }
// Expects the map will have keys, but blank values.
private Map<String, String> getProps(CloudSolrClient client, String collectionName, String replicaName, String... props)
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java Mon Aug 10 11:45:22 2015
@@ -120,6 +120,47 @@ public class ZkStateWriterTest extends S
}
}
+ public void testSingleLegacyCollection() throws Exception {
+ String zkDir = createTempDir("testSingleLegacyCollection").toFile().getAbsolutePath();
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+
+ try {
+ server.run();
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+ zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
+
+ ZkStateReader reader = new ZkStateReader(zkClient);
+ reader.createClusterStateWatchersAndUpdate();
+
+ ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ // create new collection with stateFormat = 1
+ ZkWriteCommand c1 = new ZkWriteCommand("c1",
+ new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+
+ ClusterState clusterState = writer.enqueueUpdate(reader.getClusterState(), c1, null);
+ writer.writePendingUpdates();
+
+ Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
+ assertNotNull(map.get("c1"));
+ boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
+ assertFalse(exists);
+
+ } finally {
+ IOUtils.close(zkClient);
+ server.shutdown();
+
+ }
+ }
+
public void testSingleExternalCollection() throws Exception{
String zkDir = createTempDir("testSingleExternalCollection").toFile().getAbsolutePath();
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Mon Aug 10 11:45:22 2015
@@ -1033,6 +1033,29 @@ public abstract class CollectionAdminReq
return this;
}
}
+
+ // MIGRATECLUSTERSTATE request
+ public static class MigrateClusterState extends CollectionShardAdminRequest<MigrateClusterState> {
+
+ public MigrateClusterState() {
+ this.action = CollectionAction.MIGRATESTATEFORMAT;
+ }
+
+ @Override
+ public MigrateClusterState setShardName(String shard) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getShardName() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected MigrateClusterState getThis() {
+ return this;
+ }
+ }
// BALANCESHARDUNIQUE request
public static class BalanceShardUnique extends CollectionAdminRequest<BalanceShardUnique> {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Mon Aug 10 11:45:22 2015
@@ -335,7 +335,9 @@ public class ClusterState implements JSO
* The version of clusterstate.json in ZooKeeper.
*
* @return null if ClusterState was created for publication, not consumption
+ * @deprecated true cluster state spans many ZK nodes, stop depending on the version number of the shared node!
*/
+ @Deprecated
public Integer getZkClusterStateVersion() {
return znodeVersion;
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Mon Aug 10 11:45:22 2015
@@ -23,6 +23,7 @@ import java.io.UnsupportedEncodingExcept
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -48,6 +49,8 @@ import org.slf4j.LoggerFactory;
import static java.util.Arrays.asList;
import static java.util.Collections.EMPTY_MAP;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;
import static org.apache.solr.common.util.Utils.fromJSON;
@@ -93,7 +96,8 @@ public class ZkStateReader implements Cl
public static final String LEGACY_CLOUD = "legacyCloud";
public static final String URL_SCHEME = "urlScheme";
-
+
+ /** A view of the current state of all collections; combines all the different state sources into a single view. */
protected volatile ClusterState clusterState;
private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
@@ -104,12 +108,21 @@ public class ZkStateReader implements Cl
public static final String SHARD_LEADERS_ZKNODE = "leaders";
public static final String ELECTION_NODE = "election";
- private final Set<String> watchedCollections = new HashSet<String>();
+ /** Collections we actively care about, and will try to keep watch on. */
+ private final Set<String> interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- /**These are collections which are actively watched by this instance .
- *
- */
- private Map<String , DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
+ /** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */
+ private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
+ /** Last seen ZK version of clusterstate.json. */
+ private int legacyClusterStateVersion = 0;
+
+ /** Collections with format2 state.json, "interesting" and actively watched. */
+ private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
+
+ /** Collections with format2 state.json, not "interesting" and not actively watched. */
+ private volatile Map<String, ClusterState.CollectionRef> lazyCollectionStates = new HashMap<>();
+
+ private volatile Set<String> liveNodes = emptySet();
private final ZkConfigManager configManager;
@@ -183,8 +196,6 @@ public class ZkStateReader implements Cl
private final boolean closeClient;
- private final ZkCmdExecutor cmdExecutor;
-
private volatile Aliases aliases = new Aliases();
private volatile boolean closed = false;
@@ -195,7 +206,6 @@ public class ZkStateReader implements Cl
public ZkStateReader(SolrZkClient zkClient, Runnable securityNodeListener) {
this.zkClient = zkClient;
- this.cmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
this.configManager = new ZkConfigManager(zkClient);
this.closeClient = false;
this.securityNodeListener = securityNodeListener;
@@ -223,7 +233,6 @@ public class ZkStateReader implements Cl
}
}
});
- this.cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
this.configManager = new ZkConfigManager(zkClient);
this.closeClient = true;
this.securityNodeListener = null;
@@ -232,15 +241,32 @@ public class ZkStateReader implements Cl
public ZkConfigManager getConfigManager() {
return configManager;
}
-
- // load and publish a new CollectionInfo
+
+ /**
+ * Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive.
+ */
public void updateClusterState() throws KeeperException, InterruptedException {
- updateClusterState(false);
+ synchronized (getUpdateLock()) {
+ if (clusterState == null) {
+ // Never initialized, just run normal initialization.
+ createClusterStateWatchersAndUpdate();
+ return;
+ }
+ // No need to set watchers because we should already have watchers registered for everything.
+ refreshLegacyClusterState(null);
+ for (String coll : watchedCollectionStates.keySet()) {
+ DocCollection newState = fetchCollectionState(coll, null);
+ updateWatchedCollection(coll, newState);
+ }
+ refreshLazyFormat2Collections(true);
+ refreshLiveNodes(null);
+ constructState();
+ }
}
-
- // load and publish a new CollectionInfo
+
+ /** Refresh the set of live nodes. */
public void updateLiveNodes() throws KeeperException, InterruptedException {
- updateClusterState(true);
+ refreshLiveNodes(null);
}
public Aliases getAliases() {
@@ -255,7 +281,7 @@ public class ZkStateReader implements Cl
DocCollection nu = getCollectionLive(this, coll);
if (nu == null) return -1 ;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
- updateWatchedCollection(nu);
+ updateWatchedCollection(coll, nu);
collection = nu;
}
}
@@ -272,105 +298,23 @@ public class ZkStateReader implements Cl
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
InterruptedException {
// We need to fetch the current cluster state and the set of live nodes
-
- synchronized (getUpdateLock()) {
- log.info("Updating cluster state from ZooKeeper... ");
-
- Stat stat = zkClient.exists(CLUSTER_STATE, new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- // session events are not change events,
- // and do not remove the watcher
- if (EventType.None.equals(event.getType())) {
- return;
- }
- log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event) , ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
- try {
-
- // delayed approach
- // ZkStateReader.this.updateClusterState(false, false);
- synchronized (ZkStateReader.this.getUpdateLock()) {
- // remake watch
- final Watcher thisWatch = this;
- Set<String> ln = ZkStateReader.this.clusterState.getLiveNodes();
- // update volatile
- ZkStateReader.this.clusterState = constructState(ln, thisWatch);
- }
- log.info("Updated cluster state version to " + ZkStateReader.this.clusterState.getZkClusterStateVersion());
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("", e);
- return;
- }
- }
-
- }, true);
+ log.info("Updating cluster state from ZooKeeper... ");
- if (stat == null)
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
+ // Sanity check ZK structure.
+ if (!zkClient.exists(CLUSTER_STATE, true)) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
}
-
-
- synchronized (ZkStateReader.this.getUpdateLock()) {
- List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
- new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- // session events are not change events,
- // and do not remove the watcher
- if (EventType.None.equals(event.getType())) {
- return;
- }
- try {
- // delayed approach
- // ZkStateReader.this.updateClusterState(false, true);
- synchronized (ZkStateReader.this.getUpdateLock()) {
- List<String> liveNodes = zkClient.getChildren(
- LIVE_NODES_ZKNODE, this, true);
- log.debug("Updating live nodes... ({})", liveNodes.size());
- Set<String> liveNodesSet = new HashSet<>();
- liveNodesSet.addAll(liveNodes);
- ClusterState clusterState = ZkStateReader.this.clusterState;
+ // on reconnect of SolrZkClient force refresh and re-add watches.
+ refreshLegacyClusterState(new LegacyClusterStateWatcher());
+ refreshStateFormat2Collections();
+ refreshLazyFormat2Collections(true);
+ refreshLiveNodes(new LiveNodeWatcher());
- clusterState.setLiveNodes(liveNodesSet);
- }
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("", e);
- return;
- }
- }
-
- }, true);
-
- Set<String> liveNodeSet = new HashSet<>();
- liveNodeSet.addAll(liveNodes);
- this.clusterState = constructState(liveNodeSet, null);
+ synchronized (ZkStateReader.this.getUpdateLock()) {
+ constructState();
zkClient.exists(ALIASES,
new Watcher() {
@@ -416,12 +360,7 @@ public class ZkStateReader implements Cl
}, true);
}
updateAliases();
- //on reconnect of SolrZkClient re-add watchers for the watched external collections
- synchronized (this) {
- for (String watchedCollection : watchedCollections) {
- addZkWatch(watchedCollection);
- }
- }
+
if (securityNodeListener != null) {
addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH, new Callable<Pair<byte[], Stat>>() {
@Override
@@ -431,7 +370,6 @@ public class ZkStateReader implements Cl
cd.version = pair.getValue() == null ? -1 : pair.getValue().getVersion();
securityData = cd;
securityNodeListener.run();
-
}
});
}
@@ -505,89 +443,164 @@ public class ZkStateReader implements Cl
}, true);
}
- private ClusterState constructState(Set<String> ln, Watcher watcher) throws KeeperException, InterruptedException {
- Stat stat = new Stat();
- byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
- ClusterState loadedData = ClusterState.load(stat.getVersion(), data, ln, CLUSTER_STATE);
-
- // first load all collections in /clusterstate.json (i.e. stateFormat=1)
- Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(loadedData.getCollectionStates());
-
- for (String s : getStateFormat2CollectionNames()) {
- synchronized (this) {
- if (watchedCollections.contains(s)) {
- DocCollection live = getCollectionLive(this, s);
- if (live != null) {
- updateWatchedCollection(live);
- // if it is a watched collection, add too
- result.put(s, new ClusterState.CollectionRef(live));
- }
- } else {
- // if it is not collection, then just create a reference which can fetch
- // the collection object just in time from ZK
- // this is also cheap (lazy loaded) so we put it inside the synchronized
- // block although it is not required
- final String collName = s;
- result.put(s, new ClusterState.CollectionRef(null) {
- @Override
- public DocCollection get() {
- return getCollectionLive(ZkStateReader.this, collName);
- }
- @Override
- public boolean isLazilyLoaded() {
- return true;
- }
- });
- }
+ /**
+ * Construct the total state view from all sources.
+ * Must hold {@link #getUpdateLock()} before calling this.
+ */
+ private void constructState() {
+ // Legacy clusterstate is authoritative, for backwards compatibility.
+ // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
+ Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
+
+ // Are there any interesting collections that disappeared from the legacy cluster state?
+ for (String coll : interestingCollections) {
+ if (!result.containsKey(coll) && !watchedCollectionStates.containsKey(coll)) {
+ new StateWatcher(coll).refreshAndWatch(true);
+ }
+ }
+
+ // Add state format2 collections, but don't override legacy collection states.
+ for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
+ if (!result.containsKey(entry.getKey())) {
+ result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
+ }
+ }
+
+ // Finally, add any lazy collections that aren't already accounted for.
+ for (Map.Entry<String, ClusterState.CollectionRef> entry : lazyCollectionStates.entrySet()) {
+ if (!result.containsKey(entry.getKey())) {
+ result.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
+ }
+
+ /**
+ * Refresh legacy (shared) clusterstate.json
+ */
+ private void refreshLegacyClusterState(Watcher watcher)
+ throws KeeperException, InterruptedException {
+ try {
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
+ ClusterState loadedData = ClusterState.load(stat.getVersion(), data, Collections.<String>emptySet(), CLUSTER_STATE);
+ synchronized (getUpdateLock()) {
+ this.legacyCollectionStates = loadedData.getCollectionStates();
+ this.legacyClusterStateVersion = stat.getVersion();
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Ignore missing legacy clusterstate.json.
+ synchronized (getUpdateLock()) {
+ this.legacyCollectionStates = emptyMap();
+ this.legacyClusterStateVersion = 0;
}
}
- return new ClusterState(ln, result, stat.getVersion());
}
+ /**
+ * Refresh state format2 collections.
+ */
+ private void refreshStateFormat2Collections() {
+ // It's okay if no format2 state.json exists, if one did not previous exist.
+ for (String coll : interestingCollections) {
+ new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll));
+ }
+ }
- private Set<String> getStateFormat2CollectionNames() throws KeeperException, InterruptedException {
+ /**
+ * Search for any lazy-loadable state format2 collections.
+ */
+ private void refreshLazyFormat2Collections(boolean fullRefresh) throws KeeperException, InterruptedException {
List<String> children = null;
try {
children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names");
-
- return new HashSet<>();
+ // fall through
+ }
+ if (children == null || children.isEmpty()) {
+ synchronized (getUpdateLock()) {
+ this.lazyCollectionStates = new HashMap<>();
+ }
+ return;
}
- if (children == null || children.isEmpty()) return new HashSet<>();
- HashSet<String> result = new HashSet<>(children.size(), 1.0f);
- for (String c : children) {
- try {
- // this exists call is necessary because we only want to return
- // those collections which have their own state.json.
- // The getCollectionPath() calls returns the complete path to the
- // collection's state.json
- if (zkClient.exists(getCollectionPath(c), true)) {
- result.add(c);
+ Map<String, ClusterState.CollectionRef> result = new HashMap<>();
+ for (String collName : children) {
+ if (interestingCollections.contains(collName)) {
+ // We will create an eager collection for any interesting collections.
+ continue;
+ }
+
+ if (!fullRefresh) {
+ // Try to use an already-created lazy collection if it's not a full refresh.
+ ClusterState.CollectionRef existing = lazyCollectionStates.get(collName);
+ if (existing != null) {
+ result.put(collName, existing);
+ continue;
}
- } catch (Exception e) {
- log.warn("Error reading collections nodes", e);
+ }
+
+ ClusterState.CollectionRef lazyCollectionState = tryMakeLazyCollectionStateFormat2(collName);
+ if (lazyCollectionState != null) {
+ result.put(collName, lazyCollectionState);
}
}
- return result;
- }
- // load and publish a new CollectionInfo
- private void updateClusterState(boolean onlyLiveNodes) throws KeeperException, InterruptedException {
- // build immutable CloudInfo
synchronized (getUpdateLock()) {
- List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null, true);
- Set<String> liveNodesSet = new HashSet<>(liveNodes);
+ this.lazyCollectionStates = result;
+ }
+ }
- if (!onlyLiveNodes) {
- log.debug("Updating cloud state from ZooKeeper... ");
- clusterState = constructState(liveNodesSet, null);
- } else {
- log.debug("Updating live nodes from ZooKeeper... ({})", liveNodesSet.size());
- clusterState = this.clusterState;
- clusterState.setLiveNodes(liveNodesSet);
+ private ClusterState.CollectionRef tryMakeLazyCollectionStateFormat2(final String collName) {
+ boolean exists = false;
+ try {
+ exists = zkClient.exists(getCollectionPath(collName), true);
+ } catch (Exception e) {
+ log.warn("Error reading collections nodes", e);
+ }
+ if (!exists) {
+ return null;
+ }
+
+ // if it is not collection, then just create a reference which can fetch
+ // the collection object just in time from ZK
+ return new ClusterState.CollectionRef(null) {
+ @Override
+ public DocCollection get() {
+ return getCollectionLive(ZkStateReader.this, collName);
+ }
+
+ @Override
+ public boolean isLazilyLoaded() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "lazy DocCollection(" + collName + ")";
+ }
+ };
+ }
+
+ /**
+ * Refresh live_nodes.
+ */
+ private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
+ Set<String> newLiveNodes;
+ try {
+ List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
+ log.debug("Updating live nodes from ZooKeeper... ({})", nodeList.size());
+ newLiveNodes = new HashSet<>(nodeList);
+ } catch (KeeperException.NoNodeException e) {
+ newLiveNodes = emptySet();
+ }
+ synchronized (getUpdateLock()) {
+ this.liveNodes = newLiveNodes;
+ if (clusterState != null) {
+ clusterState.setLiveNodes(newLiveNodes);
}
}
}
@@ -610,14 +623,6 @@ public class ZkStateReader implements Cl
}
}
- abstract class RunnableWatcher implements Runnable {
- Watcher watcher;
- public RunnableWatcher(Watcher watcher){
- this.watcher = watcher;
- }
-
- }
-
public String getLeaderUrl(String collection, String shard, int timeout)
throws InterruptedException, KeeperException {
ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection,
@@ -852,19 +857,152 @@ public class ZkStateReader implements Cl
}
}
+ /** Watches a single collection's format2 state.json. */
+ class StateWatcher implements Watcher {
+ private final String coll;
+
+ StateWatcher(String coll) {
+ this.coll = coll;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (!interestingCollections.contains(coll)) {
+ // This collection is no longer interesting, stop watching.
+ log.info("Uninteresting collection {}", coll);
+ return;
+ }
+
+ // session events are not change events,
+ // and do not remove the watcher
+ if (EventType.None.equals(event.getType())) {
+ return;
+ }
+
+ log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})",
+ (event), coll, ZkStateReader.this.clusterState == null ? 0
+ : ZkStateReader.this.clusterState.getLiveNodes().size());
+
+ refreshAndWatch(true);
+ synchronized (getUpdateLock()) {
+ constructState();
+ }
+ }
+
+ /**
+ * Refresh collection state from ZK and leave a watch for future changes.
+ * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
+ * with the results of the refresh.
+ *
+ * @param expectExists if true, error if no state node exists
+ */
+ public void refreshAndWatch(boolean expectExists) {
+ try {
+ DocCollection newState = fetchCollectionState(coll, this);
+ updateWatchedCollection(coll, newState);
+ } catch (KeeperException.NoNodeException e) {
+ if (expectExists) {
+ log.warn("State node vanished for collection: " + coll, e);
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("Unwatched collection: " + coll, e);
+ throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Unwatched collection :" + coll, e);
+ }
+ }
+ }
+
+ /** Watches the legacy clusterstate.json. */
+ class LegacyClusterStateWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ // session events are not change events,
+ // and do not remove the watcher
+ if (EventType.None.equals(event.getType())) {
+ return;
+ }
+ log.info("A cluster state change: {}, has occurred - updating... (live nodes size: {})", (event), ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size());
+ refreshAndWatch();
+ synchronized (getUpdateLock()) {
+ constructState();
+ }
+ }
+
+ /** Must hold {@link #getUpdateLock()} before calling this method. */
+ public void refreshAndWatch() {
+ try {
+ refreshLegacyClusterState(this);
+ // Changes to clusterstate.json signal global state changes.
+ // TODO: get rid of clusterstate.json as a signaling mechanism.
+ refreshLazyFormat2Collections(false);
+ } catch (KeeperException.NoNodeException e) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("", e);
+ }
+ }
+ }
+
+ /** Watches the live_nodes and syncs changes. */
+ class LiveNodeWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ // session events are not change events,
+ // and do not remove the watcher
+ if (EventType.None.equals(event.getType())) {
+ return;
+ }
+ log.info("A live node change: {}, has occurred - updating... (live nodes size: {})", (event), liveNodes.size());
+ refreshAndWatch();
+ }
+
+ /** Must hold {@link #getUpdateLock()} before calling this method. */
+ public void refreshAndWatch() {
+ try {
+ refreshLiveNodes(this);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("", e);
+ }
+ }
+ }
+
public static DocCollection getCollectionLive(ZkStateReader zkStateReader,
String coll) {
- String collectionPath = getCollectionPath(coll);
try {
- Stat stat = new Stat();
- byte[] data = zkStateReader.getZkClient().getData(collectionPath, null, stat, true);
- ClusterState state = ClusterState.load(stat.getVersion(), data,
- Collections.<String> emptySet(), collectionPath);
- ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
- return collectionRef == null ? null : collectionRef.get();
- } catch (KeeperException.NoNodeException e) {
- log.warn("No node available : " + collectionPath, e);
- return null;
+ return zkStateReader.fetchCollectionState(coll, null);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not load collection from ZK:" + coll, e);
@@ -875,113 +1013,71 @@ public class ZkStateReader implements Cl
}
}
+ private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
+ String collectionPath = getCollectionPath(coll);
+ try {
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
+ ClusterState state = ClusterState.load(stat.getVersion(), data,
+ Collections.<String>emptySet(), collectionPath);
+ ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
+ return collectionRef == null ? null : collectionRef.get();
+ } catch (KeeperException.NoNodeException e) {
+ return null;
+ }
+ }
+
public static String getCollectionPath(String coll) {
return COLLECTIONS_ZKNODE+"/"+coll + "/state.json";
}
public void addCollectionWatch(String coll) throws KeeperException, InterruptedException {
- synchronized (this) {
- if (watchedCollections.contains(coll)) return;
- else {
- watchedCollections.add(coll);
+ if (interestingCollections.add(coll)) {
+ log.info("addZkWatch {}", coll);
+ new StateWatcher(coll).refreshAndWatch(false);
+ synchronized (getUpdateLock()) {
+ constructState();
}
- addZkWatch(coll);
}
}
- private void addZkWatch(final String coll) throws KeeperException,
- InterruptedException {
- log.info("addZkWatch {}", coll);
- final String fullpath = getCollectionPath(coll);
- synchronized (getUpdateLock()) {
-
- cmdExecutor.ensureExists(fullpath, zkClient);
- log.info("Updating collection state at {} from ZooKeeper... ", fullpath);
-
- Watcher watcher = new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- // session events are not change events,
- // and do not remove the watcher
- if (EventType.None.equals(event.getType())) {
- return;
- }
- log.info("A cluster state change: {} for collection {} has occurred - updating... (live nodes size: {})",
- (event), coll, ZkStateReader.this.clusterState == null ? 0
- : ZkStateReader.this.clusterState.getLiveNodes().size());
- try {
-
- // delayed approach
- // ZkStateReader.this.updateClusterState(false, false);
- synchronized (ZkStateReader.this.getUpdateLock()) {
- if (!watchedCollections.contains(coll)) {
- log.info("Unwatched collection {}", coll);
- return;
- }
- // remake watch
- final Watcher thisWatch = this;
- Stat stat = new Stat();
- byte[] data = zkClient.getData(fullpath, thisWatch, stat, true);
-
- if (data == null || data.length == 0) {
- log.warn("No value set for collection state : {}", coll);
- return;
-
- }
- ClusterState clusterState = ClusterState.load(stat.getVersion(),
- data, Collections.<String> emptySet(), fullpath);
- // update volatile
-
- DocCollection newState = clusterState.getCollectionStates()
- .get(coll).get();
- updateWatchedCollection(newState);
-
- }
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED
- || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("Unwatched collection :" + coll, e);
- throw new ZooKeeperException(ErrorCode.SERVER_ERROR, "", e);
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Unwatched collection :" + coll, e);
- return;
- }
+ private void updateWatchedCollection(String coll, DocCollection newState) {
+ if (newState == null) {
+ log.info("Deleting data for {}", coll);
+ watchedCollectionStates.remove(coll);
+ return;
+ }
+
+ log.info("Updating data for {} to ver {} ", coll, newState.getZNodeVersion());
+ // CAS update loop
+ while (true) {
+ DocCollection oldState = watchedCollectionStates.get(coll);
+ if (oldState == null) {
+ if (watchedCollectionStates.putIfAbsent(coll, newState) == null) {
+ break;
+ }
+ } else {
+ if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+ // Nothing to do, someone else updated same or newer.
+ return;
}
-
- };
- zkClient.exists(fullpath, watcher, true);
- }
- DocCollection collection = getCollectionLive(this, coll);
- if (collection != null) {
- updateWatchedCollection(collection);
+ if (watchedCollectionStates.replace(coll, oldState, newState)) {
+ break;
+ }
+ }
}
}
- private void updateWatchedCollection(DocCollection newState) {
- watchedCollectionStates.put(newState.getName(), newState);
- log.info("Updating data for {} to ver {} ", newState.getName(), newState.getZNodeVersion());
- this.clusterState = clusterState.copyWith(newState.getName(), newState);
- }
-
/** This is not a public API. Only used by ZkController */
- public void removeZKWatch(final String coll) {
- synchronized (this) {
- watchedCollections.remove(coll);
- watchedCollectionStates.remove(coll);
- try {
- updateClusterState();
- } catch (KeeperException e) {
- log.error("Error updating state",e);
- } catch (InterruptedException e) {
- log.error("Error updating state",e);
- Thread.currentThread().interrupt();
+ public void removeZKWatch(String coll) {
+ interestingCollections.remove(coll);
+ watchedCollectionStates.remove(coll);
+ ClusterState.CollectionRef lazyCollectionStateFormat2 = tryMakeLazyCollectionStateFormat2(coll);
+ synchronized (getUpdateLock()) {
+ if (lazyCollectionStateFormat2 != null) {
+ this.lazyCollectionStates.put(coll, lazyCollectionStateFormat2);
}
+ constructState();
}
}
@@ -998,5 +1094,4 @@ public class ZkStateReader implements Cl
}
}
-
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1695026&r1=1695025&r2=1695026&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Mon Aug 10 11:45:22 2015
@@ -19,6 +19,8 @@ package org.apache.solr.common.params;
import java.util.Locale;
+import org.apache.solr.common.SolrException;
+
public interface CollectionParams
{
/** What action **/
@@ -51,7 +53,8 @@ public interface CollectionParams
DELETEREPLICAPROP(true),
BALANCESHARDUNIQUE(true),
REBALANCELEADERS(true),
- MODIFYCOLLECTION(true);
+ MODIFYCOLLECTION(true),
+ MIGRATESTATEFORMAT(true);
public final boolean isWrite;