You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/02/12 18:59:42 UTC
svn commit: r909537 - in /lucene/solr/branches/cloud/src:
common/org/apache/solr/common/cloud/ java/org/apache/solr/cloud/
java/org/apache/solr/handler/component/
solrj/org/apache/solr/client/solrj/impl/ test/org/apache/solr/cloud/
Author: markrmiller
Date: Fri Feb 12 17:59:41 2010
New Revision: 909537
URL: http://svn.apache.org/viewvc?rev=909537&view=rev
Log:
more reorganization for solrj
Added:
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java
- copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java
- copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
- copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java
- copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java
- copied, changed from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java
lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java
- copied, changed from r909510, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
Removed:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java
lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/CloudState.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -25,6 +25,10 @@
import java.util.Map;
import java.util.Set;
+import org.apache.solr.cloud.Slice;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkNodeProps;
+import org.apache.solr.cloud.ZooKeeperException;
import org.apache.solr.common.SolrException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -72,12 +76,12 @@
Map<String,Map<String,Slice>> collectionStates;
if (!onlyLiveNodes) {
List<String> collections = zkClient.getChildren(
- ZkController.COLLECTIONS_ZKNODE, null);
+ ZkStateReader.COLLECTIONS_ZKNODE, null);
collectionStates = new HashMap<String,Map<String,Slice>>();
for (String collection : collections) {
- String shardIdPaths = ZkController.COLLECTIONS_ZKNODE + "/"
- + collection + ZkController.SHARDS_ZKNODE;
+ String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/"
+ + collection + ZkStateReader.SHARDS_ZKNODE;
List<String> shardIdNames;
try {
shardIdNames = zkClient.getChildren(shardIdPaths, null);
@@ -137,7 +141,7 @@
}
private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
- List<String> liveNodes = zkClient.getChildren(ZkController.NODES_ZKNODE, null);
+ List<String> liveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null);
Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
liveNodesSet.addAll(liveNodes);
Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ConnectionManager.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -21,7 +21,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
-import org.apache.solr.cloud.SolrZkClient.OnReconnect;
+import org.apache.solr.cloud.ZkClientConnectionStrategy;
+import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -23,6 +23,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.cloud.ZkClientConnectionStrategy;
+import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Added: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java?rev=909537&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java (added)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/OnReconnect.java Fri Feb 12 17:59:41 2010
@@ -0,0 +1,5 @@
+package org.apache.solr.common.cloud;
+
+public interface OnReconnect {
+ public void command();
+}
Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZkClient.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -24,6 +24,8 @@
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
+import org.apache.solr.cloud.ZkClientConnectionStrategy;
+import org.apache.solr.cloud.ZooKeeperException;
import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
import org.apache.solr.common.SolrException;
import org.apache.zookeeper.CreateMode;
@@ -44,10 +46,6 @@
*/
public class SolrZkClient {
static final String NEWL = System.getProperty("line.separator");
-
- public static interface OnReconnect {
- public void command();
- }
static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 5000;
@@ -483,5 +481,9 @@
void updateKeeper(SolrZooKeeper keeper) {
this.keeper = keeper;
}
+
+ public SolrZooKeeper getSolrZooKeeper() {
+ return keeper;
+ }
}
Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java (from r909490, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java&r1=909490&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZooKeeper.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
import java.io.IOException;
@@ -14,7 +14,7 @@
// TODO Auto-generated constructor stub
}
- protected ClientCnxn getConnection() {
+ public ClientCnxn getConnection() {
return cnxn;
}
Copied: lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java (from r909510, lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java)
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java?p2=lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java&p1=lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java&r1=909510&r2=909537&rev=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java (original)
+++ lucene/solr/branches/cloud/src/common/org/apache/solr/common/cloud/ZkStateReader.java Fri Feb 12 17:59:41 2010
@@ -1,4 +1,4 @@
-package org.apache.solr.cloud;
+package org.apache.solr.common.cloud;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -20,21 +20,30 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.solr.cloud.SolrZkClient.OnReconnect;
+import org.apache.solr.cloud.Slice;
+import org.apache.solr.cloud.ZooKeeperException;
import org.apache.solr.common.SolrException;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkStateReader {
private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
+ public static final String COLLECTIONS_ZKNODE = "/collections";
+ public static final String SHARDS_ZKNODE = "/shards";
+ public static final String LIVE_NODES_ZKNODE = "/live_nodes";
+
private volatile CloudState cloudState = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
@@ -45,11 +54,14 @@
private SolrZkClient zkClient;
+ private boolean closeClient = false;
+
public ZkStateReader(SolrZkClient zkClient) {
this.zkClient = zkClient;
}
public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
+ closeClient = true;
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
// on reconnect, reload cloud info
new OnReconnect() {
@@ -149,6 +161,114 @@
}
+ public void addShardZkNodeWatches() throws KeeperException, InterruptedException {
+ CloudState cloudState = getCloudState();
+ Set<String> knownCollections = cloudState.getCollections();
+
+ List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+
+ for(final String collection : collections) {
+ if(!knownCollections.contains(collection)) {
+ log.info("Found new collection:" + collection);
+ Watcher watcher = new Watcher() {
+ public void process(WatchedEvent event) {
+ log.info("Detected changed ShardId in collection:" + collection);
+ try {
+ addShardsWatches(collection);
+ updateCloudState(false);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
+ };
+ boolean madeWatch = true;
+ String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
+ + SHARDS_ZKNODE;
+ for (int i = 0; i < 5; i++) {
+ try {
+ zkClient.getChildren(shardZkNode, watcher);
+ } catch (KeeperException.NoNodeException e) {
+ // most likely, the collections node has been created, but not the
+ // shards node yet -- pause and try again
+ madeWatch = false;
+ if (i == 4) {
+ log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
+ break;
+ }
+ Thread.sleep(100);
+ }
+ if (madeWatch) {
+ log.info("Made shard watch:" + shardZkNode);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ public void addShardsWatches(final String collection) throws KeeperException,
+ InterruptedException {
+ if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
+ List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
+ + collection + SHARDS_ZKNODE, null);
+ CloudState cloudState = getCloudState();
+ Set<String> knownShardIds;
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ if (slices != null) {
+ knownShardIds = slices.keySet();
+ } else {
+ knownShardIds = new HashSet<String>(0);
+ }
+ for (final String shardId : shardIds) {
+ if (!knownShardIds.contains(shardId)) {
+ zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
+ + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
+
+ public void process(WatchedEvent event) {
+ log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
+ try {
+ updateCloudState(false);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
+ });
+ }
+ }
+ }
+ }
+
+ public void addShardsWatches() throws KeeperException, InterruptedException {
+ List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+ for (final String collection : collections) {
+ addShardsWatches(collection);
+ }
+ }
+
/**
* @return information about the cluster from ZooKeeper
*/
@@ -159,4 +279,18 @@
public Object getUpdateLock() {
return this;
}
+
+ public void close() {
+ if (closeClient) {
+ try {
+ zkClient.close();
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ }
+ }
+ }
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java Fri Feb 12 17:59:41 2010
@@ -1,11 +1,7 @@
package org.apache.solr.cloud;
-import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
-import java.util.Collections;
-import java.util.HashMap;
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkClientConnectionStrategy.java Fri Feb 12 17:59:41 2010
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.cloud.SolrZooKeeper;
import org.apache.zookeeper.Watcher;
/**
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Fri Feb 12 17:59:41 2010
@@ -20,18 +20,17 @@
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.solr.cloud.SolrZkClient.OnReconnect;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.SolrParams;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -63,10 +62,8 @@
// package private for tests
- static final String SHARDS_ZKNODE = "/shards";
+
static final String CONFIGS_ZKNODE = "/configs";
- static final String COLLECTIONS_ZKNODE = "/collections";
- static final String NODES_ZKNODE = "/live_nodes";
public static final String URL_PROP = "url";
public static final String NODE_NAME = "node_name";
@@ -85,10 +82,6 @@
private String localHost;
private String hostName;
-
-
-
- private boolean readonly; // temporary hack to enable reuse in SolrJ client
/**
* @param zkServerAddress ZooKeeper server host address
@@ -108,7 +101,6 @@
this.localHostPort = locaHostPort;
this.localHostContext = localHostContext;
this.localHost = localHost;
- this.readonly = localHostPort==null;
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
// on reconnect, reload cloud info
@@ -150,7 +142,7 @@
*/
private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
- String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE + "/" + shardId;
+ String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
try {
@@ -164,7 +156,7 @@
// nocommit - scrutinize
// ping that there is a new shardId
- zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
}
} catch (KeeperException e) {
@@ -280,7 +272,7 @@
// makes nodes zkNode
try {
- zkClient.makePath(NODES_ZKNODE);
+ zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
} catch (KeeperException e) {
// its okay if another beats us creating the node
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -314,7 +306,7 @@
private void createEphemeralLiveNode() throws KeeperException,
InterruptedException {
String nodeName = getNodeName();
- String nodePath = NODES_ZKNODE + "/" + nodeName;
+ String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
log.info("Register node as live in ZooKeeper:" + nodePath);
Watcher liveNodeWatcher = new Watcher() {
@@ -347,26 +339,24 @@
};
try {
- if (!readonly) {
- boolean nodeDeleted = true;
- try {
- // we attempt a delete in the case of a quick server bounce -
- // if there was not a graceful shutdown, the node may exist
- // until expiration timeout - so a node won't be created here because
- // it exists, but eventually the node will be removed. So delete
- // in case it exists and create a new node.
- zkClient.delete(nodePath, -1);
- } catch (KeeperException.NoNodeException e) {
- // fine if there is nothing to delete
- nodeDeleted = false;
- }
- if (nodeDeleted) {
- log
- .info("Found a previous node that still exists while trying to register a new live node "
- + nodePath + " - removing existing node to create another.");
- }
- zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+ boolean nodeDeleted = true;
+ try {
+ // we attempt a delete in the case of a quick server bounce -
+ // if there was not a graceful shutdown, the node may exist
+ // until expiration timeout - so a node won't be created here because
+ // it exists, but eventually the node will be removed. So delete
+ // in case it exists and create a new node.
+ zkClient.delete(nodePath, -1);
+ } catch (KeeperException.NoNodeException e) {
+ // fine if there is nothing to delete
+ nodeDeleted = false;
+ }
+ if (nodeDeleted) {
+ log
+ .info("Found a previous node that still exists while trying to register a new live node "
+ + nodePath + " - removing existing node to create another.");
}
+ zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -374,15 +364,13 @@
}
System.out.println("NODE ALREADY EXISTS");
}
- zkClient.getChildren(NODES_ZKNODE, liveNodeWatcher);
+ zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
}
public String getNodeName() {
return hostName + ":" + localHostPort + "_"+ localHostContext;
}
-
-
/**
* @param path
* @return
@@ -406,7 +394,7 @@
String configName = null;
- String path = COLLECTIONS_ZKNODE + "/" + collection;
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
if (log.isInfoEnabled()) {
log.info("Load collection config from:" + path);
}
@@ -444,7 +432,7 @@
String collection = cloudDesc.getCollectionName();
- String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
+ String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
@@ -469,14 +457,14 @@
if(shardZkNodeAlreadyExists && forcePropsUpdate) {
zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
// tell everyone to update cloud info
- zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
} else {
addZkShardsNode(cloudDesc.getShardId(), collection);
try {
zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
CreateMode.PERSISTENT);
// tell everyone to update cloud info
- zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -536,12 +524,12 @@
private void setUpCollectionsNode() throws KeeperException, InterruptedException {
try {
- if (!zkClient.exists(COLLECTIONS_ZKNODE) && !readonly) {
+ if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
if (log.isInfoEnabled()) {
- log.info("creating zk collections node:" + COLLECTIONS_ZKNODE);
+ log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
}
// makes collections zkNode if it doesn't exist
- zkClient.makePath(COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
}
} catch (KeeperException e) {
// its okay if another beats us creating the node
@@ -559,13 +547,13 @@
}
log.info("Start watching collections zk node for changes");
- zkClient.getChildren(COLLECTIONS_ZKNODE, new Watcher(){
+ zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
public void process(WatchedEvent event) {
try {
log.info("Detected a new or removed collection");
synchronized (zkStateReader.getUpdateLock()) {
- addShardZkNodeWatches();
+ zkStateReader.addShardZkNodeWatches();
zkStateReader.updateCloudState(false);
}
// re-watch
@@ -588,7 +576,7 @@
}});
- zkClient.exists(COLLECTIONS_ZKNODE, new Watcher(){
+ zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
public void process(WatchedEvent event) {
if(event.getType() != EventType.NodeDataChanged) {
@@ -597,10 +585,10 @@
log.info("Notified of CloudState change");
try {
synchronized (zkStateReader.getUpdateLock()) {
- addShardZkNodeWatches();
+ zkStateReader.addShardZkNodeWatches();
zkStateReader.updateCloudState(false);
}
- zkClient.exists(COLLECTIONS_ZKNODE, this);
+ zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, this);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -619,120 +607,12 @@
}});
}
-
- public void addShardZkNodeWatches() throws KeeperException, InterruptedException {
- CloudState cloudState = getCloudState();
- Set<String> knownCollections = cloudState.getCollections();
-
- List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
-
- for(final String collection : collections) {
- if(!knownCollections.contains(collection)) {
- log.info("Found new collection:" + collection);
- Watcher watcher = new Watcher() {
- public void process(WatchedEvent event) {
- log.info("Detected changed ShardId in collection:" + collection);
- try {
- addShardsWatches(collection);
- zkStateReader.updateCloudState(false);
- } catch (KeeperException e) {
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- }
- };
- boolean madeWatch = true;
- String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
- + SHARDS_ZKNODE;
- for (int i = 0; i < 5; i++) {
- try {
- zkClient.getChildren(shardZkNode, watcher);
- } catch (KeeperException.NoNodeException e) {
- // most likely, the collections node has been created, but not the
- // shards node yet -- pause and try again
- madeWatch = false;
- if (i == 4) {
- log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
- break;
- }
- Thread.sleep(100);
- }
- if (madeWatch) {
- log.info("Made shard watch:" + shardZkNode);
- break;
- }
- }
- }
- }
- }
-
- public void addShardsWatches(final String collection) throws KeeperException,
- InterruptedException {
- if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
- List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
- + collection + SHARDS_ZKNODE, null);
- CloudState cloudState = getCloudState();
- Set<String> knownShardIds;
- Map<String,Slice> slices = cloudState.getSlices(collection);
- if (slices != null) {
- knownShardIds = slices.keySet();
- } else {
- knownShardIds = new HashSet<String>(0);
- }
- for (final String shardId : shardIds) {
- if (!knownShardIds.contains(shardId)) {
- zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
- + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
-
- public void process(WatchedEvent event) {
- log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
- try {
- zkStateReader.updateCloudState(false);
- } catch (KeeperException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- }
- });
- }
- }
- }
- }
-
- public void addShardsWatches() throws KeeperException, InterruptedException {
- List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
- for (final String collection : collections) {
- addShardsWatches(collection);
- }
- }
public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
String collection = cd.getCollectionName();
log.info("Check for collection zkNode:" + collection);
- String collectionPath = COLLECTIONS_ZKNODE + "/" + collection;
+ String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
try {
if(!zkClient.exists(collectionPath)) {
@@ -800,7 +680,7 @@
zkClient.makePath(collectionPath, collectionProps.store(), CreateMode.PERSISTENT, null, true);
// ping that there is a new collection
- zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryComponent.java Fri Feb 12 17:59:41 2010
@@ -27,6 +27,7 @@
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
Modified: lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java (original)
+++ lucene/solr/branches/cloud/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java Fri Feb 12 17:59:41 2010
@@ -1,20 +1,31 @@
package org.apache.solr.client.solrj.impl;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.cloud.*;
+import org.apache.solr.cloud.Slice;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkNodeProps;
+import org.apache.solr.cloud.ZooKeeperException;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.zookeeper.KeeperException;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.*;
-import java.util.concurrent.TimeoutException;
-
public class CloudSolrServer extends SolrServer {
- private volatile ZkController zkController;
+ private volatile ZkStateReader zkStateReader;
private String zkHost; // the zk server address
private int zkConnectTimeout = 10000;
private int zkClientTimeout = 10000;
@@ -61,14 +72,15 @@
* @throws InterruptedException
*/
public void connect() {
- if (zkController != null) return;
+ if (zkStateReader != null) return;
synchronized(this) {
- if (zkController != null) return;
+ if (zkStateReader != null) return;
try {
- ZkController zk = new ZkController(zkHost, zkConnectTimeout, zkClientTimeout, null, null, null);
+ ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
+ // nocommit : deal with other watches
zk.addShardZkNodeWatches();
- zk.getZkStateReader().updateCloudState(true);
- zkController = zk;
+ zk.updateCloudState(true);
+ zkStateReader = zk;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@@ -89,7 +101,7 @@
public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
connect();
- CloudState cloudState = zkController.getCloudState();
+ CloudState cloudState = zkStateReader.getCloudState();
String collection = request.getParams().get("collection", defaultCollection);
@@ -126,11 +138,11 @@
}
public void close() {
- if (zkController != null) {
+ if (zkStateReader != null) {
synchronized(this) {
- if (zkController != null)
- zkController.close();
- zkController = null;
+ if (zkStateReader!= null)
+ zkStateReader.close();
+ zkStateReader = null;
}
}
}
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Fri Feb 12 17:59:41 2010
@@ -21,6 +21,7 @@
import java.util.HashSet;
import org.apache.solr.BaseDistributedSearchTestCase;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.TestHarness;
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Fri Feb 12 17:59:41 2010
@@ -19,6 +19,7 @@
import java.io.File;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.AbstractSolrTestCase;
import org.apache.solr.util.TestHarness;
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Fri Feb 12 17:59:41 2010
@@ -23,6 +23,8 @@
import junit.framework.TestCase;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
@@ -185,7 +187,7 @@
// quickly kill / start client
- container2.getZkController().getZkClient().keeper.getConnection()
+ container2.getZkController().getZkClient().getSolrZooKeeper().getConnection()
.disconnect();
container2.shutdown();
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Fri Feb 12 17:59:41 2010
@@ -23,6 +23,9 @@
import junit.framework.TestCase;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -79,7 +82,7 @@
zkController = new ZkController(AbstractZkTestCase.ZOO_KEEPER_ADDRESS,
TIMEOUT, 1000, "localhost", "8983", "/solr");
- zkController.updateCloudState(true);
+ zkController.getZkStateReader().updateCloudState(true);
CloudState cloudInfo = zkController.getCloudState();
Map<String,Slice> slices = cloudInfo.getSlices("collection1");
assertNotNull(slices);
@@ -141,7 +144,7 @@
ZkNodeProps props = new ZkNodeProps();
props.put("configName", actualConfigName);
- zkClient.makePath(ZkController.COLLECTIONS_ZKNODE + "/" + COLLECTION_NAME , props.store(), CreateMode.PERSISTENT);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION_NAME , props.store(), CreateMode.PERSISTENT);
if (DEBUG) {
zkClient.printLayoutToStdOut();
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java?rev=909537&r1=909536&r2=909537&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java Fri Feb 12 17:59:41 2010
@@ -21,6 +21,7 @@
import junit.framework.TestCase;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;