You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/10/15 00:36:36 UTC
svn commit: r1183538 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/client/solrj/impl/
solrj/src/java/org/apache/sol...
Author: markrmiller
Date: Fri Oct 14 22:36:35 2011
New Revision: 1183538
URL: http://svn.apache.org/viewvc?rev=1183538&view=rev
Log:
initial work for SOLR-2765: Shard/Node states and SOLR-2821: Improve how cluster state is managed in ZooKeeper.
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/AssignShard.java Fri Oct 14 22:36:35 2011
@@ -17,12 +17,15 @@ package org.apache.solr.cloud;
* the License.
*/
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.KeeperException;
@@ -51,18 +54,23 @@ public class AssignShard {
lock.lock();
String returnShardId = null;
try {
- // lets read the current shards - we want to read straight from zk, and we
- // assume we have some kind
+ // lets read the current shards - we want to read straight from zk (we
+ // need the absolute latest info), and we assume we have some kind
// of collection level lock
- String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
- + ZkStateReader.SHARDS_ZKNODE;
- List<String> shardIdNames = client.getChildren(shardIdPaths, null);
+ // TODO: this made a lot more sense when the cluster state was on multiple nodes
+ // and it was just a single getChildren read.
+
+ CloudState state = CloudState.load(client.getData(ZkStateReader.CLUSTER_STATE, null, null));
+ Map<String, Slice> sliceMap = state.getSlices(collection);
- if (shardIdNames.size() == 0) {
+ if (sliceMap == null) {
return "shard1";
}
+ List<String> shardIdNames = new ArrayList<String>(sliceMap.keySet());
+
+
if (shardIdNames.size() < slices) {
return "shard" + (shardIdNames.size() + 1);
}
@@ -70,7 +78,7 @@ public class AssignShard {
// else figure out which shard needs more replicas
final Map<String,Integer> map = new HashMap<String,Integer>();
for (String shardId : shardIdNames) {
- int cnt = client.getChildren(shardIdPaths + "/" + shardId, null).size();
+ int cnt = sliceMap.get(shardId).getShards().size();
map.put(shardId, cnt);
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Fri Oct 14 22:36:35 2011
@@ -24,7 +24,8 @@ public class CloudDescriptor {
private String shardId;
private String collectionName;
private SolrParams params;
-
+ private String roles;
+
public void setShardId(String shardId) {
this.shardId = shardId;
}
@@ -41,6 +42,14 @@ public class CloudDescriptor {
this.collectionName = collectionName;
}
+ public String getRoles(){
+ return roles;
+ }
+
+ public void setRoles(String roles){
+ this.roles = roles;
+ }
+
/** Optional parameters that can change how a core is created. */
public SolrParams getParams() {
return params;
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Oct 14 22:36:35 2011
@@ -19,9 +19,12 @@ package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
@@ -30,6 +33,7 @@ import java.util.regex.Pattern;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -38,8 +42,8 @@ import org.apache.solr.common.params.Sol
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,10 +138,10 @@ public final class ZkController {
public void command() {
try {
// we need to create all of our lost watches
- zkStateReader.makeCollectionsNodeWatches();
- zkStateReader.makeShardsWatches(true);
+// zkStateReader.makeCollectionsNodeWatches();
+// zkStateReader.makeShardsWatches(true);
createEphemeralLiveNode();
- zkStateReader.updateCloudState(false);
+ zkStateReader.createClusterStateWatchersAndUpdate();
// re register all descriptors
List<CoreDescriptor> descriptors = registerOnReconnect
@@ -175,47 +179,6 @@ public final class ZkController {
}
/**
- * Adds the /collection/shards/shards_id node as well as the /collections/leader_elect/shards_id node.
- *
- * @param shardId
- * @param collection
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- private void addZkShardsNode(String shardId, String collection)
- throws IOException, InterruptedException, KeeperException {
-
- String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
- + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
-
-
- try {
-
- // shards node
- if (!zkClient.exists(shardsZkPath)) {
- if (log.isInfoEnabled()) {
- log.info("creating zk shards node:" + shardsZkPath);
- }
- // makes shards zkNode if it doesn't exist
- zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
-
- }
- } catch (KeeperException e) {
- // its okay if another beats us creating the node
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- throw e;
- }
- }
-
- leaderElector.setupForSlice(shardId, collection);
-
- // TODO: consider how these notifications are being done
- // ping that there is a new shardId
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[]) null);
- }
-
- /**
* Closes the underlying ZooKeeper client.
*/
public void close() {
@@ -331,7 +294,7 @@ public final class ZkController {
createEphemeralLiveNode();
setUpCollectionsNode();
- zkStateReader.makeCollectionsNodeWatches();
+ zkStateReader.createClusterStateWatchersAndUpdate();
} catch (IOException e) {
log.error("", e);
@@ -356,47 +319,7 @@ public final class ZkController {
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
log.info("Register node as live in ZooKeeper:" + nodePath);
- Watcher liveNodeWatcher = new Watcher() {
-
- public void process(WatchedEvent event) {
- try {
- log.info("Updating live nodes:" + zkClient);
- try {
- zkStateReader.updateLiveNodes();
- } finally {
- // re-make watch
-
- String path = event.getPath();
- if(path == null) {
- // on shutdown, it appears this can trigger with a null path
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- zkClient.getChildren(event.getPath(), this);
- }
- } catch (KeeperException e) {
- if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
-
- }
-
- };
+
try {
boolean nodeDeleted = true;
try {
@@ -422,15 +345,7 @@ public final class ZkController {
if (e.code() != KeeperException.Code.NODEEXISTS) {
throw e;
}
- }
- zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
- try {
- zkStateReader.updateLiveNodes();
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
+ }
}
public String getNodeName() {
@@ -504,44 +419,17 @@ public final class ZkController {
shardId = assignShard.assignShard(collection, numShards);
cloudDesc.setShardId(shardId);
}
- String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
-
- boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
-
- if (log.isInfoEnabled()) {
- log.info("Register shard - core:" + coreName + " address:"
- + shardUrl);
- }
-
- ZkNodeProps props = getShardZkProps(shardUrl);
-
- byte[] bytes = props.store();
String shardZkNodeName = getNodeName() + "_" + coreName;
- if(shardZkNodeAlreadyExists) {
- zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
- // tell everyone to update cloud info
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
- } else {
- addZkShardsNode(shardId, collection);
- try {
- log.info("create node:" + shardZkNodeName);
- zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
- CreateMode.PERSISTENT);
-
- // tell everyone to update cloud info
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
- } catch (KeeperException e) {
- // its okay if the node already exists
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- throw e;
- }
- // for some reason the shard already exists, though it didn't when we
- // started registration - just continue
-
+ if (log.isInfoEnabled()) {
+ log.info("Register shard - core:" + coreName + " address:"
+ + shardUrl);
}
- }
+
+ leaderElector.setupForSlice(shardId, collection);
+
+ ZkNodeProps props = addToZk(cloudDesc, shardUrl, shardZkNodeName);
// leader election
doLeaderElectionProcess(shardId, collection, shardZkNodeName, props);
@@ -549,11 +437,71 @@ public final class ZkController {
}
- private ZkNodeProps getShardZkProps(String shardUrl) {
+ ZkNodeProps addToZk(final CloudDescriptor cloudDesc, String shardUrl,
+ final String shardZkNodeName)
+ throws KeeperException, InterruptedException,
+ UnsupportedEncodingException, IOException {
ZkNodeProps props = new ZkNodeProps();
props.put(ZkStateReader.URL_PROP, shardUrl);
props.put(ZkStateReader.NODE_NAME, getNodeName());
+
+ props.put("roles", cloudDesc.getRoles());
+
+ Map<String, ZkNodeProps> shardProps = new HashMap<String, ZkNodeProps>();
+ shardProps.put(shardZkNodeName, props);
+ Slice slice = new Slice(cloudDesc.getShardId(), shardProps);
+
+ boolean persisted = false;
+ Stat stat = zkClient.exists(ZkStateReader.CLUSTER_STATE, null);
+ if (stat == null) {
+ log.info("/clusterstate does not exist, attempting to create");
+ try {
+ CloudState state = new CloudState();
+
+ state.addSlice(cloudDesc.getCollectionName(), slice);
+
+ zkClient.create(ZkStateReader.CLUSTER_STATE,
+ CloudState.store(state), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ persisted = true;
+ log.info("/clusterstate created");
+ } catch (KeeperException e) {
+ if (e.code() != Code.NODEEXISTS) {
+ // If this node exists, no big deal
+ throw e;
+ }
+ }
+ }
+ if (!persisted) {
+ stat = new Stat();
+ boolean updated = false;
+
+ // TODO: we don't want to retry forever
+ // give up at some point
+ while (!updated) {
+
+ byte[] data = zkClient.getData(ZkStateReader.CLUSTER_STATE,
+ null, stat);
+ log.info("Attempting to update /clusterstate version "
+ + stat.getVersion());
+ CloudState state = CloudState.load(data);
+
+ state.addSlice(cloudDesc.getCollectionName(), slice);
+
+ try {
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ CloudState.store(state), stat.getVersion());
+ updated = true;
+ } catch (KeeperException e) {
+ if (e.code() != Code.BADVERSION) {
+ throw e;
+ }
+ log.info("Failed to update /clusterstate, retrying");
+ }
+
+ }
+ }
return props;
}
@@ -724,21 +672,6 @@ public final class ZkController {
throw e;
}
}
- try {
- // shards node
- if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
- + ZkStateReader.SHARDS_ZKNODE)) {
- // makes shards zkNode if it doesn't exist
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection
- + ZkStateReader.SHARDS_ZKNODE);
- }
- } catch (KeeperException e) {
- // its okay if another beats us creating the node
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- throw e;
- }
- }
-
// ping that there is a new collection
zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Oct 14 22:36:35 2011
@@ -411,6 +411,10 @@ public class CoreContainer
if (opt != null) {
p.getCloudDescriptor().setCollectionName(opt);
}
+ opt = DOMUtil.getAttr(node, "roles", null);
+ if(opt != null){
+ p.getCloudDescriptor().setRoles(opt);
+ }
}
opt = DOMUtil.getAttr(node, "properties", null);
if (opt != null) {
@@ -431,29 +435,6 @@ public class CoreContainer
SolrException.logOnce(log,null,ex);
}
}
-
- if(zkController != null) {
- try {
- synchronized (zkController.getZkStateReader().getUpdateLock()) {
- zkController.getZkStateReader().makeShardZkNodeWatches(false);
- zkController.getZkStateReader().updateCloudState(true);
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (KeeperException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- }
}
private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Fri Oct 14 22:36:35 2011
@@ -19,7 +19,6 @@ package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,17 +49,6 @@ public class CloudStateUpdateTest extend
private static final boolean VERBOSE = false;
- private static final String URL1 = "http://localhost:3133/solr/core0";
- private static final String URL3 = "http://localhost:3133/solr/core1";
- private static final String URL2 = "http://localhost:3123/solr/core1";
- private static final String URL4 = "http://localhost:3123/solr/core4";
- private static final String SHARD4 = "localhost:3123_solr_core4";
- private static final String SHARD3 = "localhost:3123_solr_core3";
- private static final String SHARD2 = "localhost:3123_solr_core2";
- private static final String SHARD1 = "localhost:3123_solr_core1";
-
- private static final int TIMEOUT = 10000;
-
protected ZkTestServer zkServer;
protected String zkDir;
@@ -138,77 +126,6 @@ public class CloudStateUpdateTest extend
log.info("####SETUP_END " + getName());
}
-
- @Test
- public void testIncrementalUpdate() throws Exception {
- System.setProperty("CLOUD_UPDATE_DELAY", "1");
- String zkDir = dataDir.getAbsolutePath() + File.separator
- + "zookeeper/server1/data";
- ZkTestServer server = null;
- SolrZkClient zkClient = null;
- ZkController zkController = null;
-
- server = new ZkTestServer(zkDir);
- server.run();
- try {
- AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
- AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-
- zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
- String shardsPath1 = "/collections/collection1/shards/shardid1";
- String shardsPath2 = "/collections/collection1/shards/shardid2";
- zkClient.makePath(shardsPath1);
- zkClient.makePath(shardsPath2);
-
- addShardToZk(zkClient, shardsPath1, SHARD1, URL1);
- addShardToZk(zkClient, shardsPath1, SHARD2, URL2);
- addShardToZk(zkClient, shardsPath2, SHARD3, URL3);
-
- removeShardFromZk(server.getZkAddress(), zkClient, shardsPath1);
-
- zkController = new ZkController(server.getZkAddress(), TIMEOUT, 1000,
- "localhost", "8983", "solr", 3, new CurrentCoreDescriptorProvider() {
-
- @Override
- public List<CoreDescriptor> getCurrentDescriptors() {
- // unused
- return null;
- }
- });
-
- zkController.getZkStateReader().updateCloudState(true);
- CloudState cloudInfo = zkController.getCloudState();
- Map<String,Slice> slices = cloudInfo.getSlices("collection1");
- assertFalse(slices.containsKey("shardid1"));
-
- zkClient.makePath(shardsPath1);
- addShardToZk(zkClient, shardsPath1, SHARD1, URL1);
-
- zkController.getZkStateReader().updateCloudState(true);
- cloudInfo = zkController.getCloudState();
- slices = cloudInfo.getSlices("collection1");
- assertTrue(slices.containsKey("shardid1"));
-
- updateUrl(zkClient, shardsPath1, SHARD1, "fake");
-
- addShardToZk(zkClient, shardsPath2, SHARD4, URL4);
-
- zkController.getZkStateReader().updateCloudState(true);
- cloudInfo = zkController.getCloudState();
- String url = cloudInfo.getSlices("collection1").get("shardid1").getShards().get(SHARD1).get("url");
-
- // because of incremental update, we don't expect to find the new 'fake'
- // url - instead we should still
- // be using the original url - the correct way to update this would be to
- // remove the whole node and readd it
- assertEquals(URL1, url);
-
- } finally {
- server.shutdown();
- zkClient.close();
- zkController.close();
- }
- }
@Test
public void testCoreRegistration() throws Exception {
@@ -323,37 +240,6 @@ public class CloudStateUpdateTest extend
System.clearProperty("CLOUD_UPDATE_DELAY");
SolrConfig.severeErrors.clear();
}
-
- private void addShardToZk(SolrZkClient zkClient, String shardsPath,
- String zkNodeName, String url) throws IOException,
- KeeperException, InterruptedException {
-
- ZkNodeProps props = new ZkNodeProps();
- props.put(ZkStateReader.URL_PROP, url);
- props.put(ZkStateReader.NODE_NAME, zkNodeName);
- byte[] bytes = props.store();
-
- zkClient
- .create(shardsPath + "/" + zkNodeName, bytes, CreateMode.PERSISTENT);
- }
-
- private void updateUrl(SolrZkClient zkClient, String shardsPath,
- String zkNodeName, String url) throws IOException,
- KeeperException, InterruptedException {
-
- ZkNodeProps props = new ZkNodeProps();
- props.put(ZkStateReader.URL_PROP, url);
- props.put(ZkStateReader.NODE_NAME, zkNodeName);
- byte[] bytes = props.store();
-
- zkClient
- .setData(shardsPath + "/" + zkNodeName, bytes);
- }
-
- private void removeShardFromZk(String zkHost, SolrZkClient zkClient, String shardsPath) throws Exception {
-
- AbstractZkTestCase.tryCleanPath(zkHost, shardsPath);
- }
private void printLayout(String zkHost) throws Exception {
SolrZkClient zkClient = new SolrZkClient(
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Fri Oct 14 22:36:35 2011
@@ -38,7 +38,7 @@ import org.junit.BeforeClass;
public class FullDistributedZkTest extends AbstractDistributedZkTestCase {
private static final String DEFAULT_COLLECTION = "collection1";
- private static final boolean DEBUG = false;
+ private static final boolean DEBUG = true;
String t1="a_t";
String i1="a_si";
String nint = "n_i";
@@ -141,12 +141,7 @@ public class FullDistributedZkTest exten
*/
@Override
public void doTest() throws Exception {
- printLayout();
- // make sure 'shard1' was auto-assigned
- SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT);
- assertTrue("shard1 was not found in zk layout", zkClient.exists("/solr/collections/collection1/shards/shard1"));
- zkClient.close();
-
+
del("*:*");
indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Fri Oct 14 22:36:35 2011
@@ -64,86 +64,6 @@ public class ZkControllerTest extends So
}
@Test
- public void testReadShards() throws Exception {
- String zkDir = dataDir.getAbsolutePath() + File.separator
- + "zookeeper/server1/data";
- ZkTestServer server = null;
- SolrZkClient zkClient = null;
- ZkController zkController = null;
- try {
- server = new ZkTestServer(zkDir);
- server.run();
- AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
- AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-
- zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
- String shardsPath = "/collections/collection1/shards/shardid1";
- zkClient.makePath(shardsPath);
-
- addShardToZk(zkClient, shardsPath, SHARD1, URL1);
- addShardToZk(zkClient, shardsPath, SHARD2, URL2);
- addShardToZk(zkClient, shardsPath, SHARD3, URL3);
-
- if (DEBUG) {
- zkClient.printLayoutToStdOut();
- }
-
- zkController = new ZkController(server.getZkAddress(),
- TIMEOUT, 1000, "localhost", "8983", "solr", 3, new CurrentCoreDescriptorProvider() {
-
- @Override
- public List<CoreDescriptor> getCurrentDescriptors() {
- // do nothing
- return null;
- }
- });
-
- zkController.getZkStateReader().updateCloudState(true);
- CloudState cloudInfo = zkController.getCloudState();
- Map<String,Slice> slices = cloudInfo.getSlices("collection1");
- assertNotNull(slices);
-
- for (Slice slice : slices.values()) {
- Map<String,ZkNodeProps> shards = slice.getShards();
- if (DEBUG) {
- for (String shardName : shards.keySet()) {
- ZkNodeProps props = shards.get(shardName);
- System.out.println("shard:" + shardName);
- System.out.println("props:" + props.toString());
- }
- }
- assertNotNull(shards.get(SHARD1));
- assertNotNull(shards.get(SHARD2));
- assertNotNull(shards.get(SHARD3));
-
- ZkNodeProps props = shards.get(SHARD1);
- assertEquals(URL1, props.get(ZkStateReader.URL_PROP));
- assertEquals(TEST_NODE_NAME, props.get(ZkStateReader.NODE_NAME));
-
- props = shards.get(SHARD2);
- assertEquals(URL2, props.get(ZkStateReader.URL_PROP));
- assertEquals(TEST_NODE_NAME, props.get(ZkStateReader.NODE_NAME));
-
- props = shards.get(SHARD3);
- assertEquals(URL3, props.get(ZkStateReader.URL_PROP));
- assertEquals(TEST_NODE_NAME, props.get(ZkStateReader.NODE_NAME));
-
- }
-
- } finally {
- if (zkClient != null) {
- zkClient.close();
- }
- if (zkController != null) {
- zkController.close();
- }
- if (server != null) {
- server.shutdown();
- }
- }
- }
-
- @Test
public void testReadConfigName() throws Exception {
String zkDir = dataDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrServer.java Fri Oct 14 22:36:35 2011
@@ -93,9 +93,7 @@ public class CloudSolrServer extends Sol
if (zkStateReader != null) return;
try {
ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
- zk.makeCollectionsNodeWatches();
- zk.makeShardZkNodeWatches(false);
- zk.updateCloudState(true);
+ zk.createClusterStateWatchersAndUpdate();
zkStateReader = zk;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Fri Oct 14 22:36:35 2011
@@ -17,7 +17,13 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -25,141 +31,206 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.zookeeper.KeeperException;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.common.util.XMLErrorLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
// immutable
public class CloudState {
- protected static Logger log = LoggerFactory.getLogger(CloudState.class);
-
- private final Map<String,Map<String,Slice>> collectionStates;
- private final Set<String> liveNodes;
-
- public CloudState(Set<String> liveNodes, Map<String,Map<String,Slice>> collectionStates) {
- this.liveNodes = liveNodes;
- this.collectionStates = collectionStates;
- }
-
- public Map<String,Slice> getSlices(String collection) {
- Map<String,Slice> collectionState = collectionStates.get(collection);
- if(collectionState == null) {
- return null;
- }
- return Collections.unmodifiableMap(collectionState);
- }
-
- public Set<String> getCollections() {
- return Collections.unmodifiableSet(collectionStates.keySet());
- }
-
- public Map<String,Map<String,Slice>> getCollectionStates() {
- return Collections.unmodifiableMap(collectionStates);
- }
-
- public Set<String> getLiveNodes() {
- return Collections.unmodifiableSet(liveNodes);
- }
-
- public boolean liveNodesContain(String name) {
- return liveNodes.contains(name);
- }
-
- public static CloudState buildCloudState(SolrZkClient zkClient, CloudState oldCloudState, boolean onlyLiveNodes) throws KeeperException, InterruptedException, IOException {
- Map<String,Map<String,Slice>> collectionStates;
- if (!onlyLiveNodes) {
- List<String> collections = zkClient.getChildren(
- ZkStateReader.COLLECTIONS_ZKNODE, null);
-
- collectionStates = new HashMap<String,Map<String,Slice>>();
- for (String collection : collections) {
- String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/"
- + collection + ZkStateReader.SHARDS_ZKNODE;
- List<String> shardIdNames;
- try {
- shardIdNames = zkClient.getChildren(shardIdPaths, null);
- } catch (KeeperException.NoNodeException e) {
- // node is not valid currently
- continue;
- }
- Map<String,Slice> slices = new HashMap<String,Slice>();
- for (String shardIdZkPath : shardIdNames) {
- Slice oldSlice = null;
- if (oldCloudState.getCollectionStates().containsKey(collection)
- && oldCloudState.getCollectionStates().get(collection)
- .containsKey(shardIdZkPath)) {
- oldSlice = oldCloudState.getCollectionStates().get(collection)
- .get(shardIdZkPath);
- }
-
- Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
- + "/" + shardIdZkPath, oldSlice);
- Slice slice = new Slice(shardIdZkPath, shardsMap);
- slices.put(shardIdZkPath, slice);
- }
- collectionStates.put(collection, slices);
- }
- } else {
- collectionStates = oldCloudState.getCollectionStates();
- }
-
- CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);
-
- return cloudInfo;
- }
-
- /**
- * @param zkClient
- * @param shardsZkPath
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- * @throws IOException
- */
- private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath, Slice oldSlice)
- throws KeeperException, InterruptedException, IOException {
-
- Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
-
- if (zkClient.exists(shardsZkPath, null) == null) {
- throw new IllegalStateException("Cannot find zk shards node that should exist:"
- + shardsZkPath);
- }
-
- List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
-
- for (String shardPath : shardZkPaths) {
- ZkNodeProps props;
- if (oldSlice != null && oldSlice.getShards().containsKey(shardPath)) {
- props = oldSlice.getShards().get(shardPath);
- } else {
- byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
- null);
-
- props = new ZkNodeProps();
- props.load(data);
- }
-
- shardNameToProps.put(shardPath, props);
- }
+ protected static Logger log = LoggerFactory.getLogger(CloudState.class);
+ private static final XMLErrorLogger xmllog = new XMLErrorLogger(log);
+ private Map<String, Map<String, Slice>> collectionStates;
+ private Set<String> liveNodes;
+
+ public CloudState() {
+ this.liveNodes = new HashSet<String>();
+ this.collectionStates = new HashMap<String, Map<String, Slice>>(0);
+ }
+
+ public CloudState(Set<String> liveNodes,
+ Map<String, Map<String, Slice>> collectionStates) {
+ this.liveNodes = liveNodes;
+ this.collectionStates = collectionStates;
+ }
+
+ public Slice getSlice(String collection, String slice) {
+ if (collectionStates.containsKey(collection)
+ && collectionStates.get(collection).containsKey(slice))
+ return collectionStates.get(collection).get(slice);
+ return null;
+ }
+
+ public void addSlice(String collection, Slice slice) {
+ if (!collectionStates.containsKey(collection)) {
+ log.info("New collection");
+ collectionStates.put(collection, new HashMap<String, Slice>());
+ }
+ if (!collectionStates.get(collection).containsKey(slice.getName())) {
+ log.info("New slice: " + slice.getName());
+ collectionStates.get(collection).put(slice.getName(), slice);
+ } else {
+ log.info("Updating existing slice");
+
+ Map<String, ZkNodeProps> shards = new HashMap<String, ZkNodeProps>();
+
+ Slice existingSlice = collectionStates.get(collection).get(slice.getName());
+ shards.putAll(existingSlice.getShards());
+ shards.putAll(slice.getShards());
+ Slice updatedSlice = new Slice(slice.getName(), shards);
+ collectionStates.get(collection).put(slice.getName(), updatedSlice);
+ }
+ }
+
+ public Map<String, Slice> getSlices(String collection) {
+ if(!collectionStates.containsKey(collection))
+ return null;
+ return Collections.unmodifiableMap(collectionStates.get(collection));
+ }
+
+ public Set<String> getCollections() {
+ return Collections.unmodifiableSet(collectionStates.keySet());
+ }
+
+ public Map<String, Map<String, Slice>> getCollectionStates() {
+ return Collections.unmodifiableMap(collectionStates);
+ }
+
+ public Set<String> getLiveNodes() {
+ return Collections.unmodifiableSet(liveNodes);
+ }
+
+ public void setLiveNodes(Set<String> liveNodes) {
+ this.liveNodes = liveNodes;
+ }
+
+ public boolean liveNodesContain(String name) {
+ return liveNodes.contains(name);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("live nodes:" + liveNodes);
+ sb.append(" collections:" + collectionStates);
+ return sb.toString();
+ }
+
+ public static CloudState load(byte[] state) {
+ // TODO this should throw some exception instead of eating them
+ CloudState cloudState = new CloudState();
+ if(state != null && state.length > 0) {
+ InputSource is = new InputSource(new ByteArrayInputStream(state));
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+ try {
+ DocumentBuilder db = dbf.newDocumentBuilder();
+
+ db.setErrorHandler(xmllog);
+ Document doc = db.parse(is);
+
+ Element root = doc.getDocumentElement();
+
+ NodeList collectionStates = root.getChildNodes();
+ for (int x = 0; x < collectionStates.getLength(); x++) {
+ Node collectionState = collectionStates.item(x);
+ String collectionName = collectionState.getAttributes()
+ .getNamedItem("name").getNodeValue();
+ NodeList slices = collectionState.getChildNodes();
+ for (int y = 0; y < slices.getLength(); y++) {
+ Node slice = slices.item(y);
+ Node sliceName = slice.getAttributes().getNamedItem("name");
+
+ NodeList shardsNodeList = slice.getChildNodes();
+ Map<String, ZkNodeProps> shards = new HashMap<String, ZkNodeProps>();
+ for (int z = 0; z < shardsNodeList.getLength(); z++) {
+ Node shard = shardsNodeList.item(z);
+ String shardName = shard.getAttributes()
+ .getNamedItem("name").getNodeValue();
+ NodeList propsList = shard.getChildNodes();
+ ZkNodeProps props = new ZkNodeProps();
+
+ for (int i = 0; i < propsList.getLength(); i++) {
+ Node prop = propsList.item(i);
+ String propName = prop.getAttributes()
+ .getNamedItem("name").getNodeValue();
+ String propValue = prop.getTextContent();
+ props.put(propName, propValue);
+ }
+ shards.put(shardName, props);
+ }
+ Slice s = new Slice(sliceName.getNodeValue(), shards);
+ cloudState.addSlice(collectionName, s);
+ }
+ }
+ } catch (SAXException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (ParserConfigurationException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ // some XML parsers are broken and don't close the byte stream (but
+ // they should according to spec)
+ IOUtils.closeQuietly(is.getByteStream());
+ }
+ }
+ return cloudState;
+ }
+
+ public static byte[] store(CloudState state)
+ throws UnsupportedEncodingException, IOException {
+ StringWriter stringWriter = new StringWriter();
+ Writer w = new BufferedWriter(stringWriter);
+ w.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n");
+ w.write("<clusterstate>");
+ Map<String, Map<String, Slice>> collectionStates = state
+ .getCollectionStates();
+ for (String collectionName : collectionStates.keySet()) {
+ w.write("<collectionstate name=\"" + collectionName + "\">");
+ Map<String, Slice> collection = collectionStates
+ .get(collectionName);
+ for (String sliceName : collection.keySet()) {
+ w.write("<shard name=\"" + sliceName + "\">");
+ Slice slice = collection.get(sliceName);
+ Map<String, ZkNodeProps> shards = slice.getShards();
+ for (String shardName : shards.keySet()) {
+ w.write("<replica name=\"" + shardName + "\">");
+ ZkNodeProps props = shards.get(shardName);
+ for (String propName : props.keySet()) {
+ w.write("<str name=\"" + propName + "\">"
+ + props.get(propName) + "</str>");
+ }
+ w.write("</replica>");
+
+ }
+ w.write("</shard>");
+ }
+ w.write("</collectionstate>");
+ }
+ w.write("</clusterstate>");
+ w.flush();
+ w.close();
+ return stringWriter.toString().getBytes("UTF-8");
- return Collections.unmodifiableMap(shardNameToProps);
- }
-
- private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
- List<String> liveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null);
+ }
+
+ public void setLiveNodes(List<String> liveNodes) {
Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
liveNodesSet.addAll(liveNodes);
-
- return liveNodesSet;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("live nodes:" + liveNodes);
- sb.append(" collections:" + collectionStates);
- return sb.toString();
+ this.liveNodes = liveNodesSet;
}
-
}
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1183538&r1=1183537&r2=1183538&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Oct 14 22:36:35 2011
@@ -18,11 +18,7 @@ package org.apache.solr.common.cloud;
*/
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -30,10 +26,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +39,12 @@ public class ZkStateReader {
public static final String COLLECTIONS_ZKNODE = "/collections";
public static final String URL_PROP = "url";
public static final String NODE_NAME = "node_name";
- public static final String SHARDS_ZKNODE = "/shards";
+ public static final String ROLES_PROP = "roles";
public static final String LIVE_NODES_ZKNODE = "/live_nodes";
+ public static final String CLUSTER_STATE = "/clusterstate";
- private volatile CloudState cloudState = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
-
+ private volatile CloudState cloudState = new CloudState();
+
private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
@@ -81,9 +78,7 @@ public class ZkStateReader {
public void command() {
try {
- makeCollectionsNodeWatches();
- makeShardsWatches(true);
- updateCloudState(false);
+ ZkStateReader.this.createClusterStateWatchersAndUpdate();
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -94,10 +89,6 @@ public class ZkStateReader {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
}
}
@@ -116,24 +107,131 @@ public class ZkStateReader {
updateCloudState(true, true);
}
- // load and publish a new CollectionInfo
- private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
- IOException {
+ public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
+ InterruptedException {
+ // We need to fetch the current cluster state and the set of live nodes
+
+ if (!zkClient.exists(CLUSTER_STATE)) {
+ try {
+ zkClient.create(CLUSTER_STATE, null, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ // if someone beats us to creating this ignore it
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ }
+ }
+
+ CloudState clusterState;
+ log.info("Updating cluster state from ZooKeeper... ");
+ byte[] data = zkClient.getData(CLUSTER_STATE, new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.info("A cluster state change has occurred");
+ try {
+ byte[] data = zkClient.getData(CLUSTER_STATE, this, null);
+ // delayed approach
+ // ZkStateReader.this.updateCloudState(false, false);
+ synchronized (ZkStateReader.this.getUpdateLock()) {
+ CloudState clusterState = CloudState.load(data);
+ clusterState.setLiveNodes(ZkStateReader.this.cloudState
+ .getLiveNodes());
+ // update volatile
+ cloudState = clusterState;
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } /*
+ * catch(IOException e){ log.error("", e); throw new
+ * ZooKeeperException( SolrException.ErrorCode.SERVER_ERROR, "", e); }
+ */
+ }
+
+ }, null);
+
+ clusterState = CloudState.load(data);
+
+ List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
+ new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.info("Updating live nodes");
+ try {
+ // delayed approach
+ // ZkStateReader.this.updateCloudState(false, true);
+ synchronized (ZkStateReader.this.getUpdateLock()) {
+ List<String> liveNodes = zkClient.getChildren(
+ LIVE_NODES_ZKNODE, this);
+ ZkStateReader.this.cloudState.setLiveNodes(liveNodes);
+ }
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
+
+ });
+
+ clusterState.setLiveNodes(liveNodes);
+ this.cloudState = clusterState;
+
+ }
+
+
+ // load and publish a new CollectionInfo
+ private synchronized void updateCloudState(boolean immediate,
+ final boolean onlyLiveNodes) throws KeeperException,
+ InterruptedException, IOException {
+ log.info("Manual update of cluster state initiated");
// build immutable CloudInfo
- if(immediate) {
- if(!onlyLiveNodes) {
+ if (immediate) {
+ CloudState clusterState;
+ if (!onlyLiveNodes) {
log.info("Updating cloud state from ZooKeeper... ");
+ byte[] data = zkClient.getData(ZkStateReader.CLUSTER_STATE, null, null);
+
+ clusterState = CloudState.load(data);
} else {
log.info("Updating live nodes from ZooKeeper... ");
+ clusterState = cloudState;
}
- CloudState cloudState;
- cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
+
+ List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE, null);
+
+ clusterState.setLiveNodes(liveNodes);
// update volatile
- this.cloudState = cloudState;
+ this.cloudState = clusterState;
} else {
- if(cloudStateUpdateScheduled) {
+ if (cloudStateUpdateScheduled) {
log.info("Cloud state update for ZooKeeper already scheduled");
return;
}
@@ -142,15 +240,31 @@ public class ZkStateReader {
updateCloudExecutor.schedule(new Runnable() {
public void run() {
- log.info("Updating cloud state from ZooKeeper...");
+ log.info("Updating cluster state from ZooKeeper...");
synchronized (getUpdateLock()) {
cloudStateUpdateScheduled = false;
- CloudState cloudState;
+ CloudState clusterState;
try {
- cloudState = CloudState.buildCloudState(zkClient,
- ZkStateReader.this.cloudState, onlyLiveNodes);
+ if (!onlyLiveNodes) {
+ log.info("Updating cloud state from ZooKeeper... ");
+ byte[] data = zkClient.getData(ZkStateReader.CLUSTER_STATE,
+ null, null);
+
+ clusterState = CloudState.load(data);
+ } else {
+ log.info("Updating live nodes from ZooKeeper... ");
+ clusterState = ZkStateReader.this.cloudState;
+ }
+
+ List<String> liveNodes = zkClient.getChildren(LIVE_NODES_ZKNODE,
+ null);
+ clusterState.setLiveNodes(liveNodes);
+ // update volatile
+ ZkStateReader.this.cloudState = clusterState;
+
} catch (KeeperException e) {
- if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
return;
}
@@ -163,10 +277,6 @@ public class ZkStateReader {
log.error("", e);
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
}
// update volatile
ZkStateReader.this.cloudState = cloudState;
@@ -174,123 +284,9 @@ public class ZkStateReader {
}
}, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
}
-
- }
-
- public void makeShardZkNodeWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
- CloudState cloudState = getCloudState();
- Set<String> knownCollections = cloudState.getCollections();
- List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
-
- for(final String collection : collections) {
- if(makeWatchesForReconnect || !knownCollections.contains(collection)) {
- log.info("Found new collection:" + collection);
- Watcher watcher = new Watcher() {
- public void process(WatchedEvent event) {
- log.info("Detected changed ShardId in collection:" + collection);
- try {
- makeShardsWatches(collection, false);
- updateCloudState(false);
- } catch (KeeperException e) {
- if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.warn("", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("", e);
- } catch (IOException e) {
- log.warn("", e);
- }
- }
- };
- boolean madeWatch = true;
- String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
- + SHARDS_ZKNODE;
- for (int i = 0; i < 5; i++) {
- try {
- zkClient.getChildren(shardZkNode, watcher);
- } catch (KeeperException.NoNodeException e) {
- // most likely, the collections node has been created, but not the
- // shards node yet -- pause and try again
- madeWatch = false;
- if (i == 4) {
- log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
- break;
- }
- Thread.sleep(100);
- }
- if (madeWatch) {
- log.info("Made shard watch:" + shardZkNode);
- break;
- }
- }
- }
- }
}
-
- public void makeShardsWatches(final String collection, boolean makeWatchesForReconnect) throws KeeperException,
- InterruptedException {
- if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
- List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
- + collection + SHARDS_ZKNODE, null);
- CloudState cloudState = getCloudState();
- Set<String> knownShardIds;
- Map<String,Slice> slices = cloudState.getSlices(collection);
- if (slices != null) {
- knownShardIds = slices.keySet();
- } else {
- knownShardIds = new HashSet<String>(0);
- }
- for (final String shardId : shardIds) {
- if (makeWatchesForReconnect || !knownShardIds.contains(shardId)) {
- zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
- + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
-
- public void process(WatchedEvent event) {
- log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
- try {
- updateCloudState(false);
- } catch (KeeperException e) {
- if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- }
- });
- }
- }
- }
- }
-
- /**
- * @throws KeeperException
- * @throws InterruptedException
- */
- public void makeShardsWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
- List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
- for (final String collection : collections) {
- makeShardsWatches(collection, makeWatchesForReconnect);
- }
- }
-
+
/**
* @return information about the cluster from ZooKeeper
*/
@@ -315,74 +311,13 @@ public class ZkStateReader {
}
}
}
+
+ abstract class RunnableWatcher implements Runnable {
+ Watcher watcher;
+ public RunnableWatcher(Watcher watcher){
+ this.watcher = watcher;
+ }
- public void makeCollectionsNodeWatches() throws KeeperException, InterruptedException {
- log.info("Start watching collections zk node for changes");
- zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
-
- public void process(WatchedEvent event) {
- try {
-
- log.info("Detected a new or removed collection");
- synchronized (getUpdateLock()) {
- makeShardZkNodeWatches(false);
- updateCloudState(false);
- }
- // re-watch
- String path = event.getPath();
- if (path != null) {
- zkClient.getChildren(path, this);
- }
- } catch (KeeperException e) {
- if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.warn("", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("", e);
- } catch (IOException e) {
- log.warn("", e);
- }
-
- }});
-
- zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
-
- public void process(WatchedEvent event) {
- if(event.getType() != EventType.NodeDataChanged) {
- return;
- }
- log.info("Notified of CloudState change");
- try {
- synchronized (getUpdateLock()) {
- makeShardZkNodeWatches(false);
- updateCloudState(false);
- }
- zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, this);
- } catch (KeeperException e) {
- if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
-
- }});
-
- }
+ }
+
}