You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/01/12 15:56:49 UTC
svn commit: r898348 - in /lucene/solr/branches/cloud/src:
java/org/apache/solr/cloud/ java/org/apache/solr/core/
java/org/apache/solr/handler/component/ test/org/apache/solr/cloud/
Author: markrmiller
Date: Tue Jan 12 14:56:48 2010
New Revision: 898348
URL: http://svn.apache.org/viewvc?rev=898348&view=rev
Log:
many updates and additions
Added:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
Modified:
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java
lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java
lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=898348&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudDescriptor.java Tue Jan 12 14:56:48 2010
@@ -0,0 +1,50 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class CloudDescriptor {
+ // nocommit : zk
+ private String shardList;
+ private String collectionName;
+ private String role = "none";
+
+ public String getRole() {
+ return role;
+ }
+
+ public void setRole(String role) {
+ this.role = role;
+ }
+
+ public void setShardList(String shardList) {
+ this.shardList = shardList;
+ }
+
+ //nocommit: may be null
+ public String getShardList() {
+ return shardList;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public void setCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ }
+}
Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java?rev=898348&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudInfo.java Tue Jan 12 14:56:48 2010
@@ -0,0 +1,34 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CloudInfo {
+ private Map<String,CollectionInfo> collectionInfos = new HashMap<String,CollectionInfo>();
+
+ //nocommit
+ public void addCollectionInfo(String collection, CollectionInfo collectionInfo) {
+ collectionInfos.put(collection, collectionInfo);
+ }
+
+ public CollectionInfo getCollectionInfo(String collection) {
+ return collectionInfos.get(collection);
+ }
+}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CollectionInfo.java Tue Jan 12 14:56:48 2010
@@ -40,8 +40,8 @@
.getLogger(CollectionInfo.class);
static final String SHARD_LIST_PROP = "shard_list";
-
static final String URL_PROP = "url";
+ static final String ROLE_PROP = "role";
// maps shard name to the shard addresses and roles
private final Map<String,ShardInfoList> shardNameToShardInfoList;
@@ -77,7 +77,7 @@
HashMap<String,ShardInfoList> shardNameToShardList = new HashMap<String,ShardInfoList>();
if (zkClient.exists(path, null) == null) {
- throw new IllegalStateException("Cannot find zk node that should exist:"
+ throw new IllegalStateException("Cannot find zk shards node that should exist:"
+ path);
}
List<String> nodes = zkClient.getChildren(path, null);
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ConnectionManager.java Tue Jan 12 14:56:48 2010
@@ -21,6 +21,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.cloud.SolrZkClient.OnReconnect;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -45,12 +46,15 @@
private SolrZkClient client;
- public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat) {
+ private OnReconnect onReconnect;
+
+ public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
this.name = name;
this.client = client;
this.connectionStrategy = strat;
this.zkServerAddress = zkServerAddress;
this.zkClientTimeout = zkClientTimeout;
+ this.onReconnect = onConnect;
reset();
}
@@ -62,7 +66,8 @@
public synchronized void process(WatchedEvent event) {
if (log.isInfoEnabled()) {
- log.info("Watcher " + this + " name:" + name + " got event " + event);
+ log.info("Watcher " + this + " name:" + name + " got event " + event
+ + " path:" + event.getPath() + " type:" + event.getType());
}
state = event.getState();
@@ -72,9 +77,6 @@
} else if (state == KeeperState.Expired) {
connected = false;
log.info("Attempting to reconnect to ZooKeeper...");
- boolean connected = true;
-
- // nocommit : close old ZooKeeper client?
try {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() {
@@ -82,26 +84,36 @@
public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
client.updateKeeper(keeper);
+ if(onReconnect != null) {
+ onReconnect.command();
+ }
ConnectionManager.this.connected = true;
}
});
} catch (Exception e) {
- // TODO Auto-generated catch block
+ // nocommit
e.printStackTrace();
}
log.info("Connected:" + connected);
- // nocommit: start reconnect attempts
} else if (state == KeeperState.Disconnected) {
+ if(connected == false) {
+ // nocommit
+ System.out.println("we already know we are dc'd - why are we notified twice?");
+ return;
+ }
connected = false;
- // nocommit: start reconnect attempts
-
+ // nocommit: start reconnect attempts - problem if this is shutdown related?
+
try {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() {
@Override
public void update(ZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
waitForConnected(SolrZkClient.CONNECT_TIMEOUT);
client.updateKeeper(keeper);
+ if(onReconnect != null) {
+ onReconnect.command();
+ }
ConnectionManager.this.connected = true;
}
});
@@ -124,9 +136,8 @@
return state;
}
- public synchronized ZooKeeper waitForConnected(long waitForConnection)
+ public synchronized void waitForConnected(long waitForConnection)
throws InterruptedException, TimeoutException, IOException {
- ZooKeeper keeper = new ZooKeeper(zkServerAddress, zkClientTimeout, this);
long expire = System.currentTimeMillis() + waitForConnection;
long left = waitForConnection;
while (!connected && left > 0) {
@@ -136,7 +147,6 @@
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper within " + waitForConnection + " ms");
}
- return keeper;
}
public synchronized void waitForDisconnected(long timeout)
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/DefaultConnectionStrategy.java Tue Jan 12 14:56:48 2010
@@ -18,24 +18,49 @@
*/
import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- *
+ * nocommit : default needs backoff retry reconnection attempts
*/
public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
+ private static Logger log = LoggerFactory.getLogger(DefaultConnectionStrategy.class);
+ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+
@Override
public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
updater.update(new ZooKeeper(serverAddress, timeout, watcher));
}
@Override
- public void reconnect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
- updater.update(new ZooKeeper(serverAddress, timeout, watcher));
+ public void reconnect(final String serverAddress, final int zkClientTimeout,
+ final Watcher watcher, final ZkUpdate updater) throws IOException {
+ log.info("Starting reconnect to ZooKeeper attempts ...");
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ log.info("Attempting the connect...");
+ try {
+ updater.update(new ZooKeeper(serverAddress, zkClientTimeout, watcher));
+ // nocommit
+ log.info("Reconnected to ZooKeeper");
+ } catch (Exception e) {
+ // nocommit
+ e.printStackTrace();
+ log.info("Reconnect to ZooKeeper failed");
+ }
+ executor.shutdownNow();
+
+ }
+ }, 0, 1000, TimeUnit.MILLISECONDS); // nocommit : we actually want to do backoff retry
}
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardInfo.java Tue Jan 12 14:56:48 2010
@@ -24,9 +24,12 @@
public final class ShardInfo {
private final String url;
- //nocommit do role based on existing ReplicationHandler role detection?
+
+ // nocommit do role based on existing ReplicationHandler role detection?
private final Role role;
+ private String zkNodeName;
+
public ShardInfo(String url) {
this.url = url;
role = Role.SLAVE;
@@ -44,7 +47,14 @@
public String getUrl() {
return url;
}
-
+
+ public String getZkNodeName() {
+ return zkNodeName;
+ }
+
+ public void setZkNodeName(String zkNodeName) {
+ this.zkNodeName = zkNodeName;
+ }
enum Role {
MASTER, SLAVE
Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java?rev=898348&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ShardsWatcher.java Tue Jan 12 14:56:48 2010
@@ -0,0 +1,68 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// nocommit - explore handling shard changes
+// watches the shards zkNode
+class ShardsWatcher implements Watcher {
+
+ private static Logger log = LoggerFactory.getLogger(ZkController.class);
+
+ // thread safe
+ private ZkController controller;
+
+ public ShardsWatcher(ZkController controller) {
+ this.controller = controller;
+ }
+
+ public void process(WatchedEvent event) {
+ // nocommit : this will be called too often as shards register themselves?
+ System.out.println("shard node changed");
+
+ try {
+ // nocommit : refresh watcher
+ // controller.getKeeperConnection().exists(event.getPath(), this);
+
+ // TODO: need to load whole state?
+ controller.loadCloudInfo();
+
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "ZooKeeper Exception", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "IOException", e);
+ }
+
+ }
+
+}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/SolrZkClient.java Tue Jan 12 14:56:48 2010
@@ -25,6 +25,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.solr.cloud.ZkClientConnectionStrategy.ZkUpdate;
+import org.apache.solr.common.SolrException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
@@ -45,17 +46,21 @@
*/
public class SolrZkClient {
static final String NEWL = System.getProperty("line.separator");
+
+ public static interface OnReconnect {
+ public void command();
+ }
static final int CONNECT_TIMEOUT = 5000;
- protected static final Logger log = LoggerFactory
+ private static final Logger log = LoggerFactory
.getLogger(SolrZkClient.class);
boolean connected = false;
private ConnectionManager connManager;
- private volatile ZooKeeper keeper;
+ volatile ZooKeeper keeper;
/**
* @param zkServerAddress
@@ -65,7 +70,11 @@
* @throws IOException
*/
public SolrZkClient(String zkServerAddress, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException {
- this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy());
+ this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, OnReconnect onReonnect) throws InterruptedException, TimeoutException, IOException {
+ this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect);
}
/**
@@ -77,23 +86,24 @@
* @throws IOException
*/
public SolrZkClient(String zkServerAddress, int zkClientTimeout,
- ZkClientConnectionStrategy strat) throws InterruptedException,
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect) throws InterruptedException,
TimeoutException, IOException {
- connManager = new ConnectionManager("ZooKeeperConnection Watcher:" + zkServerAddress, this,
- zkServerAddress, zkClientTimeout, strat);
- strat.connect(zkServerAddress, zkClientTimeout, connManager, new ZkUpdate() {
- @Override
- public void update(ZooKeeper zooKeeper) {
- if(keeper != null) {
- try {
- keeper.close();
- } catch (InterruptedException e) {
- // nocommit
+ connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
+ strat.connect(zkServerAddress, zkClientTimeout, connManager,
+ new ZkUpdate() {
+ @Override
+ public void update(ZooKeeper zooKeeper) {
+ if (keeper != null) {
+ try {
+ keeper.close();
+ } catch (InterruptedException e) {
+ // nocommit
+ }
+ }
+ keeper = zooKeeper;
}
- }
- keeper = zooKeeper;
- }
- });
+ });
connManager.waitForConnected(CONNECT_TIMEOUT);
}
@@ -136,6 +146,17 @@
throws KeeperException, InterruptedException {
return keeper.exists(path, watcher);
}
+
+ /**
+ * @param path
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public boolean exists(final String path)
+ throws KeeperException, InterruptedException {
+ return keeper.exists(path, null) != null;
+ }
/**
* @param path
@@ -286,7 +307,7 @@
mode = createMode;
bytes = data;
}
- create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+ keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
// set new watch
exists(currentPath, watcher);
} else if (i == paths.length - 1) {
@@ -318,7 +339,7 @@
* @throws KeeperException
* @throws InterruptedException
*/
- public void write(String path, byte[] data) throws KeeperException,
+ public void setData(String path, byte[] data) throws KeeperException,
InterruptedException {
makePath(path);
@@ -340,14 +361,14 @@
* @throws KeeperException
* @throws InterruptedException
*/
- public void write(String path, File file) throws IOException,
+ public void setData(String path, File file) throws IOException,
KeeperException, InterruptedException {
if (log.isInfoEnabled()) {
log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path);
}
String data = FileUtils.readFileToString(file);
- write(path, data.getBytes());
+ setData(path, data.getBytes());
}
/**
@@ -419,7 +440,7 @@
*/
void updateKeeper(ZooKeeper keeper) {
// nocommit
- log.info("Updating ZooKeeper instance");
+ log.info("Updating ZooKeeper instance:" + keeper);
this.keeper = keeper;
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Tue Jan 12 14:56:48 2010
@@ -19,6 +19,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@@ -34,6 +35,7 @@
import javax.xml.parsers.ParserConfigurationException;
+import org.apache.solr.cloud.SolrZkClient.OnReconnect;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
@@ -43,6 +45,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,177 +57,102 @@
* notes: loads everything on init, creates what's not there - further updates
* are prompted with Watches.
*
- * TODO: handle ZooKeeper goes down / failures, Solr still runs
*/
public final class ZkController {
- static final String NEWL = System.getProperty("line.separator");
-
- private static final String COLLECTIONS_ZKNODE = "/collections/";
-
- static final String NODE_ZKPREFIX = "/node";
-
- private static final String SHARDS_ZKNODE = "/shards";
-
- static final String PROPS_DESC = "NodeDesc";
-
-
-
-
- private static final String CONFIGS_ZKNODE = "/configs/";
-
-
- // nocommit - explore handling shard changes
- // watches the shards zkNode
- static class ShardsWatcher implements Watcher {
-
- private ZkController controller;
-
- public ShardsWatcher(ZkController controller) {
- this.controller = controller;
- }
-
- public void process(WatchedEvent event) {
- // nocommit : this will be called too often as shards register themselves
- System.out.println("shards changed");
-
- try {
- // refresh watcher
- //controller.getKeeperConnection().exists(event.getPath(), this);
-
- // TODO: need to load whole state?
- controller.loadCollectionInfo();
-
- } catch (KeeperException e) {
- log.error("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "ZooKeeper Exception", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- } catch (IOException e) {
- log.error("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "IOException", e);
- }
- }
-
- }
+ private static Logger log = LoggerFactory.getLogger(ZkController.class);
- final ShardsWatcher SHARD_WATCHER = new ShardsWatcher(this);
+ static final String NEWL = System.getProperty("line.separator");
private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
-
private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
- private static Logger log = LoggerFactory
- .getLogger(ZkController.class);
-
- private SolrZkClient zkClient;
+ // package private for tests
+ static final String CORE_ZKPREFIX = "/core";
+ static final String SHARDS_ZKNODE = "/shards";
+ // nocommit : ok to be public? for corecontainer access
+ public static final String CONFIGS_ZKNODE = "/configs";
+ static final String COLLECTIONS_ZKNODE = "/collections";
- SolrZkClient getKeeperConnection() {
- return zkClient;
- }
+ static final String PROPS_DESC = "CoreDesc";
- private String collectionName;
+ final ShardsWatcher shardWatcher = new ShardsWatcher(this);
- private volatile CollectionInfo collectionInfo;
+ private SolrZkClient zkClient;
- private String shardsZkPath;
+ private volatile CloudInfo cloudInfo;
private String zkServerAddress;
- private String hostPort;
-
- private String hostContext;
-
- private String configName;
-
- private String zooKeeperHostName;
-
- private String hostName;
+ private String localHostPort;
+ private String localHostContext;
+ private String localHostName;
+ private String localHost;
/**
*
* @param zkServerAddress ZooKeeper server host address
- * @param collection
- * @param hostName
- * @param hostPort
- * @param hostContext
* @param zkClientTimeout
- * @throws IOException
- * @throws TimeoutException
- * @throws InterruptedException
+ * @param collection
+ * @param localHost
+ * @param locaHostPort
+ * @param localHostContext
+ * @throws IOException
+ * @throws TimeoutException
+ * @throws InterruptedException
*/
- public ZkController(String zkServerAddress, String collection,
- String hostName, String hostPort, String hostContext, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException {
+ public ZkController(String zkServerAddress, int zkClientTimeout, String localHost, String locaHostPort,
+ String localHostContext) throws InterruptedException,
+ TimeoutException, IOException {
- this.collectionName = collection;
this.zkServerAddress = zkServerAddress;
- this.hostPort = hostPort;
- this.hostContext = hostContext;
- this.hostName = hostName;
- zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout);
-
- shardsZkPath = COLLECTIONS_ZKNODE + collectionName + SHARDS_ZKNODE;
+ this.localHostPort = locaHostPort;
+ this.localHostContext = localHostContext;
+ this.localHost = localHost;
+ zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout,
+ // on reconnect, reload cloud info
+ new OnReconnect() {
+
+ public void command() {
+ try {
+ loadCloudInfo();
+ } catch (KeeperException e) {
+ log.error("ZooKeeper Exception", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "ZooKeeper Exception", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ }
+ }
+ });
+
init();
}
- private void init() {
-
- try {
- zooKeeperHostName = getHostAddress();
- Matcher m = URL_POST.matcher(zooKeeperHostName);
- if (m.matches()) {
- String hostName = m.group(1);
- // register host
- zkClient.makePath(hostName);
- } else {
- // nocommit
- throw new IllegalStateException("Bad host:" + zooKeeperHostName);
- }
-
- // build layout if not exists
- buildZkLayoutZkNodes();
-
- configName = readConfigName(collectionName);
-
- // load the state of the cloud
- loadCollectionInfo();
-
- } catch (IOException e) {
- log.error("", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Can't create ZooKeeperController", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- } catch (KeeperException e) {
- log.error("KeeperException", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
-
- }
-
- public boolean configFileExists(String fileName) throws KeeperException,
- InterruptedException {
- return configFileExists(configName, fileName);
- }
-
/**
* nocommit: adds nodes if they don't exist, eg /shards/ node. consider race
* conditions
*/
- private void buildZkLayoutZkNodes() throws IOException {
+ private void addZkShardsNode(String collection) throws IOException {
+
+ String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE;
try {
// shards node
- if (!exists(shardsZkPath)) {
+ if (!zkClient.exists(shardsZkPath)) {
if (log.isInfoEnabled()) {
log.info("creating zk shards node:" + shardsZkPath);
}
// makes shards zkNode if it doesn't exist
- zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, SHARD_WATCHER);
+ zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
+
+ // ping that there is a new collection
+ zkClient.setData(COLLECTIONS_ZKNODE, (byte[])null);
}
} catch (KeeperException e) {
// its okay if another beats us creating the node
@@ -237,17 +165,10 @@
// Restore the interrupted status
Thread.currentThread().interrupt();
}
-
- // no watch the shards node
- try {
- zkClient.exists(shardsZkPath, new Watcher(){
- public void process(WatchedEvent event) {
- // nocommit
- // the shards node has been updated
- // we need to look for new nodes
-
- }});
+ // now watch the shards node
+ try {
+ zkClient.exists(shardsZkPath, shardWatcher);
} catch (KeeperException e) {
log.error("ZooKeeper Exception", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -271,42 +192,126 @@
}
/**
- * @return information about the current collection from ZooKeeper
+ * @param collection
+ * @param fileName
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public boolean configFileExists(String collection, String fileName)
+ throws KeeperException, InterruptedException {
+ Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + fileName, null);
+ return stat != null;
+ }
+
+ /**
+ * @return information about the cluster from ZooKeeper
+ */
+ public CloudInfo getCloudInfo() {
+ return cloudInfo;
+ }
+
+ public List<String> getCollectionNames() throws KeeperException,
+ InterruptedException {
+ // nocommit : watch for new collections?
+ List<String> collectionNodes = zkClient.getChildren(COLLECTIONS_ZKNODE,
+ null);
+
+ return collectionNodes;
+ }
+
+ /**
+ * Load SolrConfig from ZooKeeper.
+ *
+ * TODO: consider *many* cores firing up at once and loading the same files
+ * from ZooKeeper
+ *
+ * @param resourceLoader
+ * @param solrConfigFileName
+ * @return
+ * @throws IOException
+ * @throws ParserConfigurationException
+ * @throws SAXException
+ * @throws InterruptedException
+ * @throws KeeperException
*/
- public CollectionInfo getCollectionInfo() {
- return collectionInfo;
+ public SolrConfig getConfig(String zkConfigName, String solrConfigFileName,
+ SolrResourceLoader resourceLoader) throws IOException,
+ ParserConfigurationException, SAXException, KeeperException,
+ InterruptedException {
+ byte[] config = zkClient.getData(CONFIGS_ZKNODE + "/" + zkConfigName + "/"
+ + solrConfigFileName, null, null);
+ InputStream is = new ByteArrayInputStream(config);
+ SolrConfig cfg = solrConfigFileName == null ? new SolrConfig(
+ resourceLoader, SolrConfig.DEFAULT_CONF_FILE, is) : new SolrConfig(
+ resourceLoader, solrConfigFileName, is);
+
+ return cfg;
}
/**
+ * @param zkConfigName
+ * @param fileName
* @return
+ * @throws KeeperException
+ * @throws InterruptedException
*/
- public String getZkServerAddress() {
- return zkServerAddress;
+ public byte[] getConfigFileData(String zkConfigName, String fileName)
+ throws KeeperException, InterruptedException {
+ return zkClient.getData(CONFIGS_ZKNODE + "/" + zkConfigName, null, null);
}
- // load and publish a new CollectionInfo
- private void loadCollectionInfo() throws KeeperException, InterruptedException, IOException {
- // build immutable CollectionInfo
-
- CollectionInfo collectionInfo = new CollectionInfo(zkClient, shardsZkPath);
- // update volatile
- this.collectionInfo = collectionInfo;
+ // nocommit: fooling around
+ private String getHostAddress() throws IOException {
+
+ if (localHost == null) {
+ localHost = "http://" + InetAddress.getLocalHost().getHostName();
+ } else {
+ Matcher m = URL_PREFIX.matcher(localHost);
+ if (m.matches()) {
+ String prefix = m.group(1);
+ localHost = prefix + localHost;
+ } else {
+ localHost = "http://" + localHost;
+ }
+ }
+ if (log.isInfoEnabled()) {
+ log.info("Register host with ZooKeeper:" + localHost);
+ }
+
+ return localHost;
}
/**
- * @return name of configuration zkNode to use
+ * Load IndexSchema from ZooKeeper.
+ *
+ * TODO: consider *many* cores firing up at once and loading the same files
+ * from ZooKeeper
+ *
+ * @param resourceLoader
+ * @param schemaName
+ * @param config
+ * @return
+ * @throws InterruptedException
+ * @throws KeeperException
*/
- public String getConfigName() {
- return configName;
+ public IndexSchema getSchema(String zkConfigName, String schemaName,
+ SolrConfig config, SolrResourceLoader resourceLoader)
+ throws KeeperException, InterruptedException {
+ byte[] configBytes = zkClient.getData(CONFIGS_ZKNODE + "/" + zkConfigName
+ + "/" + schemaName, null, null);
+ InputStream is = new ByteArrayInputStream(configBytes);
+ IndexSchema schema = new IndexSchema(config, schemaName, is);
+ return schema;
}
// nocommit - testing
- public String getSearchNodes() {
+ public String getSearchNodes(String collection) {
StringBuilder nodeString = new StringBuilder();
boolean first = true;
List<String> nodes;
- nodes = collectionInfo.getSearchShards();
+ nodes = cloudInfo.getCollectionInfo(collection).getSearchShards();
// nocommit
System.out.println("there are " + nodes.size() + " node(s)");
for (String node : nodes) {
@@ -320,192 +325,116 @@
return nodeString.toString();
}
+ SolrZkClient getZkClient() {
+ return zkClient;
+ }
+
/**
- * Register shard. A SolrCore calls this on startup to register with
- * ZooKeeper.
- *
- * @param core
* @return
*/
- public String registerShard(SolrCore core) {
- String coreName = core.getCoreDescriptor().getName();
- String shardUrl = zooKeeperHostName + ":" + hostPort + "/" + hostContext
- + "/" + coreName;
+ public String getZkServerAddress() {
+ return zkServerAddress;
+ }
- // nocommit:
- if (log.isInfoEnabled()) {
- log.info("Register shard - core:" + core.getName() + " address:"
- + shardUrl);
- }
+ private void init() {
- String nodePath = null;
try {
- // create node
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // nocommit: could do xml
- Properties props = new Properties();
- props.put(CollectionInfo.URL_PROP, shardUrl);
-
- String shardList = core.getCoreDescriptor().getShardList();
-
- props.put(CollectionInfo.SHARD_LIST_PROP, shardList == null ? "" : shardList);
- props.store(baos, PROPS_DESC);
-
+ localHostName = getHostAddress();
+ Matcher m = URL_POST.matcher(localHostName);
+ if (m.matches()) {
+ String hostName = m.group(1);
+ // register host
+ zkClient.makePath(hostName);
+ } else {
+ // nocommit
+ throw new IllegalStateException("Unrecognized host:"
+ + localHostName);
+ }
- nodePath = zkClient.create(shardsZkPath + NODE_ZKPREFIX, baos
- .toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
// nocommit
- zkClient.exists(shardsZkPath, SHARD_WATCHER);
+ setUpCollectionsNode();
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- } catch (KeeperException e) {
- log.error("ZooKeeper Exception", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "ZooKeeper Exception", e);
} catch (IOException e) {
log.error("", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
-
- return nodePath;
- }
-
- /**
- * @param core
- * @param zkNodePath
- */
- public void unRegisterShard(SolrCore core, String zkNodePath) {
- // nocommit : version?
- try {
- zkClient.delete(zkNodePath, -1);
+ "Can't create ZooKeeperController", e);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
- } catch (KeeperException.NoNodeException e) {
- // nocommit - this is okay - for some reason the node is already gone
- log.warn("Unregistering core: " + core.getName()
- + " but core's ZooKeeper node has already been removed");
} catch (KeeperException e) {
- log.error("ZooKeeper Exception", e);
- // we can't get through to ZooKeeper, so log error
- // and allow close process to continue -
- // if ZooKeeper is down, our ephemeral node
- // should be removed anyway
+ log.error("KeeperException", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
+
}
- // nocommit: fooling around
- private String getHostAddress() throws IOException {
+ // load and publish a new CollectionInfo
+ public void loadCloudInfo() throws KeeperException, InterruptedException,
+ IOException {
+ // nocommit : not thread safe anymore ?
- if (hostName == null) {
- hostName = "http://" + InetAddress.getLocalHost().getHostName();
- } else {
- Matcher m = URL_PREFIX.matcher(hostName);
- if (m.matches()) {
- String prefix = m.group(1);
- hostName = prefix + hostName;
- } else {
- hostName = "http://" + hostName;
- }
- }
- if (log.isInfoEnabled()) {
- log.info("Register host with ZooKeeper:" + hostName);
+ log.info("Updating cloud state from ZooKeeper... :" + zkClient.keeper);
+
+ // build immutable CloudInfo
+ CloudInfo cloudInfo = new CloudInfo();
+ List<String> collections = getCollectionNames();
+ // nocommit : load all collection info
+ for (String collection : collections) {
+ String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection
+ + SHARDS_ZKNODE;
+ CollectionInfo collectionInfo = new CollectionInfo(zkClient, shardsZkPath);
+ cloudInfo.addCollectionInfo(collection, collectionInfo);
}
- return hostName;
- }
-
- /**
- * Check if path exists in ZooKeeper.
- *
- * @param path ZooKeeper path
- * @return true if path exists in ZooKeeper
- * @throws InterruptedException
- * @throws KeeperException
- */
- public boolean exists(String path) throws KeeperException,
- InterruptedException {
- Object exists = zkClient.exists(path, null);
-
- return exists != null;
+ // update volatile
+ this.cloudInfo = cloudInfo;
}
/**
- * Load SolrConfig from ZooKeeper.
- *
- * TODO: consider *many* cores firing up at once and loading the same files
- * from ZooKeeper
- *
- * @param resourceLoader
- * @param solrConfigFileName
+ * @param path
* @return
- * @throws IOException
- * @throws ParserConfigurationException
- * @throws SAXException
- * @throws InterruptedException
* @throws KeeperException
+ * @throws InterruptedException
*/
- public SolrConfig getConfig(String zkConfigName, String solrConfigFileName,
- SolrResourceLoader resourceLoader) throws IOException,
- ParserConfigurationException, SAXException, KeeperException,
+ public boolean pathExists(String path) throws KeeperException,
InterruptedException {
- byte[] config = zkClient.getData(CONFIGS_ZKNODE + zkConfigName
- + "/" + solrConfigFileName, null, null);
- InputStream is = new ByteArrayInputStream(config);
- SolrConfig cfg = solrConfigFileName == null ? new SolrConfig(
- resourceLoader, SolrConfig.DEFAULT_CONF_FILE, is) : new SolrConfig(
- resourceLoader, solrConfigFileName, is);
-
- return cfg;
- }
-
- public byte[] getConfigFileData(String zkConfigName, String fileName)
- throws KeeperException, InterruptedException {
- return zkClient.getData(CONFIGS_ZKNODE + zkConfigName, null, null);
+ return zkClient.exists(path);
}
/**
- * Load IndexSchema from ZooKeeper.
- *
- * TODO: consider *many* cores firing up at once and loading the same files
- * from ZooKeeper
- *
- * @param resourceLoader
- * @param schemaName
- * @param config
+ * @param collection
* @return
- * @throws InterruptedException
* @throws KeeperException
+ * @throws InterruptedException
*/
- public IndexSchema getSchema(String zkConfigName, String schemaName,
- SolrConfig config, SolrResourceLoader resourceLoader)
- throws KeeperException, InterruptedException {
- byte[] configBytes = zkClient.getData(CONFIGS_ZKNODE + zkConfigName
- + "/" + schemaName, null, null);
- InputStream is = new ByteArrayInputStream(configBytes);
- IndexSchema schema = new IndexSchema(config, schemaName, is);
- return schema;
- }
-
public String readConfigName(String collection) throws KeeperException,
InterruptedException {
// nocommit: load all config at once or organize differently (Properties?)
String configName = null;
- String path = COLLECTIONS_ZKNODE + collection;
+ String path = COLLECTIONS_ZKNODE + "/" + collection;
if (log.isInfoEnabled()) {
log.info("Load collection config from:" + path);
}
List<String> children;
try {
children = zkClient.getChildren(path, null);
- } catch(KeeperException.NoNodeException e) {
- log.error("Could not find config name to use for collection:" + collection, e);
+ } catch (KeeperException.NoNodeException e) {
+ // no config is set - check if there is only one config
+ // and if there is, use that
+ children = zkClient.getChildren(CONFIGS_ZKNODE, null);
+ if(children.size() == 1) {
+ String config = children.get(0);
+ log.info("No config set for " + collection + ", using single config found:" + config);
+ return config;
+ }
+
+ log.error(
+ "Multiple configurations were found, but config name to use for collection:"
+ + collection + " could not be located", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Timeout waiting for ZooKeeper connection", e);
+ "Multiple configurations were found, but config name to use for collection:"
+ + collection + " could not be located", e);
}
for (String node : children) {
// nocommit
@@ -516,12 +445,19 @@
if (log.isInfoEnabled()) {
log.info("Using collection config:" + configName);
}
+ // nocommmit : bail or read more?
}
}
if (configName == null) {
+ children = zkClient.getChildren(CONFIGS_ZKNODE, null);
+ if(children.size() == 1) {
+ String config = children.get(0);
+ log.info("No config set for " + collection + ", using single config found:" + config);
+ return config;
+ }
throw new IllegalStateException("no config specified for collection:"
- + collection);
+ + collection + " " + children.size() + " configurations exist");
}
return configName;
@@ -536,20 +472,19 @@
* @throws KeeperException
* @throws IOException
*/
- public Map<String,ShardInfoList> readShardInfo(String path)
+ public Map<String,ShardInfoList> readShardsNode(String path)
throws KeeperException, InterruptedException, IOException {
- // for now, just reparse everything
+
HashMap<String,ShardInfoList> shardNameToShardList = new HashMap<String,ShardInfoList>();
- if (!exists(path)) {
+ if (!zkClient.exists(path)) {
throw new IllegalStateException("Cannot find zk node that should exist:"
+ path);
}
List<String> nodes = zkClient.getChildren(path, null);
for (String zkNodeName : nodes) {
- byte[] data = zkClient.getData(path + "/" + zkNodeName, null,
- null);
+ byte[] data = zkClient.getData(path + "/" + zkNodeName, null, null);
Properties props = new Properties();
props.load(new ByteArrayInputStream(data));
@@ -580,10 +515,163 @@
return Collections.unmodifiableMap(shardNameToShardList);
}
- public boolean configFileExists(String configName, String fileName)
- throws KeeperException, InterruptedException {
- Stat stat = zkClient.exists(CONFIGS_ZKNODE + configName, null);
- return stat != null;
+ /**
+ * Register shard. A SolrCore calls this on startup to register with
+ * ZooKeeper.
+ *
+ * @param core
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public String register(SolrCore core) throws IOException,
+ KeeperException, InterruptedException {
+ String coreName = core.getCoreDescriptor().getName();
+ String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
+ + "/" + coreName;
+
+ // nocommit:
+ if (log.isInfoEnabled()) {
+ log.info("Register shard - core:" + core.getName() + " address:"
+ + shardUrl);
+ }
+
+ CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+
+ String collection = cloudDesc.getCollectionName();
+ String nodePath = null;
+ String shardsZkPath = COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE;
+
+ // build layout if not exists
+ // nocommit : consider how we watch shards on all collections
+ addZkShardsNode(collection);
+
+ // create node
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ // nocommit: could do xml
+ Properties props = new Properties();
+ props.put(CollectionInfo.URL_PROP, shardUrl);
+
+ String shardList = cloudDesc.getShardList();
+
+ props.put(CollectionInfo.SHARD_LIST_PROP, shardList == null ? ""
+ : shardList);
+
+ props.put(CollectionInfo.ROLE_PROP, cloudDesc.getRole());
+
+ props.store(baos, PROPS_DESC);
+
+ nodePath = zkClient.create(shardsZkPath + CORE_ZKPREFIX,
+ baos.toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
+
+ return nodePath;
+ }
+
+ /**
+ * @param core
+ * @param zkNodePath
+ */
+ public void unregister(SolrCore core, String zkNodePath) {
+ // nocommit : version?
+ try {
+ zkClient.delete(zkNodePath, -1);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ } catch (KeeperException.NoNodeException e) {
+ // nocommit - this is okay - for some reason the node is already gone
+ log.warn("Unregistering core: " + core.getName()
+ + " but core's ZooKeeper node has already been removed");
+ } catch (KeeperException e) {
+ log.error("ZooKeeper Exception", e);
+ // we can't get through to ZooKeeper, so log error
+ // and allow close process to continue -
+ // if ZooKeeper is down, our ephemeral node
+ // should be removed anyway
+ }
+ }
+
+ /**
+ * @param dir
+ * @param zkPath
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void uploadDirToCloud(File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
+ File[] files = dir.listFiles();
+ for(File file : files) {
+ if (!file.getName().startsWith(".")) {
+ if (!file.isDirectory()) {
+ zkClient.setData(zkPath + "/" + file.getName(), file);
+ } else {
+ uploadDirToCloud(file, zkPath + "/" + file.getName());
+ }
+ }
+ }
+
+ }
+
+ // convenience for testing
+ void printLayoutToStdOut() throws KeeperException, InterruptedException {
+ zkClient.printLayoutToStdOut();
+ }
+
+ // nocommit
+ public void watchShards() throws KeeperException, InterruptedException {
+ List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, new Watcher() {
+
+ public void process(WatchedEvent event) {
+ System.out.println("Collections node event:" + event);
+
+ }});
+ }
+
+ private void setUpCollectionsNode() throws KeeperException, InterruptedException {
+ try {
+ if (!zkClient.exists(COLLECTIONS_ZKNODE)) {
+ if (log.isInfoEnabled()) {
+ log.info("creating zk collections node:" + COLLECTIONS_ZKNODE);
+ }
+ // makes collections zkNode if it doesn't exist
+ zkClient.makePath(COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
+ }
+ } catch (KeeperException e) {
+ // its okay if another beats us creating the node
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ log.error("ZooKeeper Exception", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "ZooKeeper Exception", e);
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+
+ log.info("Start watching collections node for changes");
+ zkClient.exists(COLLECTIONS_ZKNODE, new Watcher(){
+
+ public void process(WatchedEvent event) {
+ // nocommit
+ System.out.println("collections node changed: "+ event);
+ if(event.getType() == EventType.NodeDataChanged) {
+ // no commit - we may have a new collection, watch the shards node for them
+
+ // re-watch
+ try {
+ zkClient.exists(COLLECTIONS_ZKNODE, this);
+ } catch (KeeperException e) {
+ log.error("ZooKeeper Exception", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "ZooKeeper Exception", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }});
}
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Tue Jan 12 14:56:48 2010
@@ -69,8 +69,8 @@
//nocommit:
System.out.println("look for:" + file);
try {
- if (zkController.exists(file)) {
- byte[] bytes = zkController.getKeeperConnection().getData(getConfigDir() + "/" + resource, null, null);
+ if (zkController.pathExists(file)) {
+ byte[] bytes = zkController.getZkClient().getData(getConfigDir() + "/" + resource, null, null);
return new ByteArrayInputStream(bytes);
}
} catch (Exception e) {
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java Tue Jan 12 14:56:48 2010
@@ -54,7 +54,7 @@
*/
public class CoreContainer
{
- private static final String DEFAULT_CORE_NAME = "DEFAULT_CORE";
+ static final String DEFAULT_CORE_NAME = "DEFAULT_CORE";
protected static Logger log = LoggerFactory.getLogger(CoreContainer.class);
@@ -79,7 +79,6 @@
protected String solrConfigFilenameOverride;
protected String solrDataDirOverride;
protected String zkPortOverride;
- protected String collection;
private String testShardsListOverride;
private ZkController zooKeeperController;
@@ -92,6 +91,8 @@
private void initZooKeeper(String zkHost, int zkClientTimeout) {
// nocommit: perhaps get from solr.xml
+
+ // if zkHost sys property is not set, we are not using ZooKeeper
String zookeeperHost;
if(zkHost == null) {
zookeeperHost = System.getProperty("zkHost");
@@ -100,20 +101,39 @@
}
if (zookeeperHost != null) {
+ // we are ZooKeeper enabled
try {
- // nocommit : exceptions
- zooKeeperController = new ZkController(zookeeperHost, collection, host, hostPort, hostContext, zkClientTimeout);
+ zooKeeperController = new ZkController(zookeeperHost, zkClientTimeout, host, hostPort, hostContext);
+
+ String confDir = System.getProperty("bootstrapConfDir");
+ if(confDir != null) {
+ File dir = new File(confDir);
+ if(!dir.isDirectory()) {
+ throw new IllegalArgumentException("bootstrap conf dir must be directory");
+ }
+ String confName = System.getProperty("bootstrapConfName", "conf1");
+ zooKeeperController.uploadDirToCloud(dir, ZkController.CONFIGS_ZKNODE + confName);
+
+ }
+
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
} catch (TimeoutException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ log.error("Could not connect to ZooKeeper", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not connect to ZooKeeper", e);
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
}
}
+
}
public Properties getContainerProperties() {
@@ -283,7 +303,8 @@
hostContext = cfg.get("solr/cores/@hostContext", "solr");
host = cfg.get("solr/cores/@host", null);
- collection = cfg.get("solr/cores/@collection", "collection1"); //nocommit: default collection
+ // nocommit read from core
+ //collection = cfg.get("solr/cores/@collection", "collection1"); //nocommit: default collection
if(shareSchema){
indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
@@ -341,9 +362,17 @@
}
opt = DOMUtil.getAttr(node, "shardList", null);
if(testShardsListOverride != null && name.equals("")) {
- p.setShardList(testShardsListOverride);
+ p.getCloudDescriptor().setShardList(testShardsListOverride);
} else if(opt != null) {
- p.setShardList(opt);
+ p.getCloudDescriptor().setShardList(opt);
+ }
+ opt = DOMUtil.getAttr(node, "role", null);
+ if(opt != null) {
+ p.getCloudDescriptor().setRole(opt);
+ }
+ opt = DOMUtil.getAttr(node, "collection", null);
+ if (opt != null) {
+ p.getCloudDescriptor().setCollectionName(opt);
}
opt = DOMUtil.getAttr(node, "properties", null);
if (opt != null) {
@@ -364,13 +393,33 @@
SolrException.logOnce(log,null,ex);
}
}
- }
-
- finally {
+ } finally {
if (cfgis != null) {
try { cfgis.close(); } catch (Exception xany) {}
}
}
+
+
+ if(zooKeeperController != null) {
+ // nocommit : exceptions
+ try {
+ zooKeeperController.loadCloudInfo();
+
+ // nocommit : set shards node watches
+ zooKeeperController.watchShards();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ log.error("ZooKeeper Exception", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "ZooKeeper Exception", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
}
private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {
@@ -479,16 +528,18 @@
String instanceDir = idir.getPath();
// Initialize the solr config
- SolrResourceLoader solrLoader ;
+ SolrResourceLoader solrLoader = null;
SolrConfig config = null;
+ String zkConfigName = null;
if(zooKeeperController == null) {
solrLoader = new SolrResourceLoader(instanceDir, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()));
config = new SolrConfig(solrLoader, dcore.getConfigName(), null);
} else {
- solrLoader = new ZkSolrResourceLoader(instanceDir, collection, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()), zooKeeperController);
try {
- config = zooKeeperController.getConfig(zooKeeperController.getConfigName(), dcore.getConfigName(), solrLoader);
+ zkConfigName = zooKeeperController.readConfigName(dcore.getCloudDescriptor().getCollectionName());
+ solrLoader = new ZkSolrResourceLoader(instanceDir, zkConfigName, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()), zooKeeperController);
+ config = zooKeeperController.getConfig(zkConfigName, dcore.getConfigName(), solrLoader);
} catch (KeeperException e) {
log.error("ZooKeeper Exception", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -496,7 +547,7 @@
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
- //nocommit: we may not know the config name - now what
+ //nocommit: not good - we can't continue
}
}
IndexSchema schema = null;
@@ -531,7 +582,8 @@
if(schema == null){
if(zooKeeperController != null) {
try {
- schema = zooKeeperController.getSchema(zooKeeperController.getConfigName(), dcore.getSchemaName(), config, solrLoader);
+ System.out.println("config:" + zkConfigName);
+ schema = zooKeeperController.getSchema(zkConfigName, dcore.getSchemaName(), config, solrLoader);
} catch (KeeperException e) {
log.error("ZooKeeper Exception", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreDescriptor.java Tue Jan 12 14:56:48 2010
@@ -20,6 +20,8 @@
import java.util.Properties;
import java.io.File;
+import org.apache.solr.cloud.CloudDescriptor;
+
/**
* A Solr core descriptor
*
@@ -34,11 +36,16 @@
protected String schemaName;
private final CoreContainer coreContainer;
private Properties coreProperties;
- private String shardList;
+
+ // nocommit : only filled when using ZooKeeper
+ private CloudDescriptor cloudDesc = new CloudDescriptor();
public CoreDescriptor(CoreContainer coreContainer, String name, String instanceDir) {
this.coreContainer = coreContainer;
this.name = name;
+
+ // cloud collection defaults to core name
+ this.cloudDesc.setCollectionName(name == "" ? CoreContainer.DEFAULT_CORE_NAME : name);
if (name == null) {
throw new RuntimeException("Core needs a name");
}
@@ -173,13 +180,7 @@
}
}
- public void setShardList(String shardList) {
- System.out.println("set shard list:" + shardList);
- this.shardList = shardList;
- }
-
- //nocommit: may be null
- public String getShardList() {
- return shardList;
+ public CloudDescriptor getCloudDescriptor() {
+ return cloudDesc;
}
}
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/SolrCore.java Tue Jan 12 14:56:48 2010
@@ -50,6 +50,7 @@
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.apache.solr.util.plugin.PluginInfoInitialized;
+import org.apache.zookeeper.KeeperException;
import org.apache.commons.io.IOUtils;
import org.xml.sax.SAXException;
@@ -531,7 +532,21 @@
zooKeeperComponent = cd.getCoreContainer().getZooKeeperController();
if(zooKeeperComponent != null) {
- this.zkNodePath = zooKeeperComponent.registerShard(this);
+ // load ZooKeeper - nocommit: somehow fall back to local configs?
+ try {
+ this.zkNodePath = zooKeeperComponent.register(this);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ log.error("ZooKeeper Exception", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "ZooKeeper Exception", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
}
//Initialize JMX
@@ -711,7 +726,7 @@
log.info(logid+" CLOSING SolrCore " + this);
// nocommit : if ZooKeeper, unregister core
if(zooKeeperComponent != null) {
- zooKeeperComponent.unRegisterShard(this, zkNodePath);
+ zooKeeperComponent.unregister(this, zkNodePath);
}
try {
infoRegistry.clear();
Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/handler/component/QueryElevationComponent.java Tue Jan 12 14:56:48 2010
@@ -175,7 +175,7 @@
// check if using ZooKeeper
ZkController zooKeeperController = core.getCoreDescriptor().getCoreContainer().getZooKeeperController();
if(zooKeeperController != null) {
- exists = zooKeeperController.configFileExists(f);
+ exists = zooKeeperController.configFileExists(core.getCoreDescriptor().getCloudDescriptor().getCollectionName(), f);
} else {
File fC = new File( core.getResourceLoader().getConfigDir(), f );
File fD = new File( core.getDataDir(), f );
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java Tue Jan 12 14:56:48 2010
@@ -85,18 +85,18 @@
RandVal.uniqueValues = new HashSet(); //reset random values
doTest();
- printeLayout();
+ printLayout();
destroyServers();
}
}
public void tearDown() throws Exception {
- printeLayout();
+ printLayout();
super.tearDown();
}
- private void printeLayout() throws Exception {
+ protected void printLayout() throws Exception {
SolrZkClient zkClient = new SolrZkClient(AbstractZkTestCase.JUST_HOST_NAME, AbstractZkTestCase.TIMEOUT);
zkClient.printLayoutToStdOut();
zkClient.close();
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/AbstractZkTestCase.java Tue Jan 12 14:56:48 2010
@@ -42,7 +42,8 @@
+ System.getProperty("file.separator") + getClass().getName() + "-"
+ System.currentTimeMillis());
- private ZkTestServer zkServer;
+ protected ZkTestServer zkServer;
+ protected String zkDir;
public AbstractZkTestCase() {
@@ -59,7 +60,7 @@
public void setUp() throws Exception {
try {
System.setProperty("zkHost", ZOO_KEEPER_ADDRESS);
- String zkDir = tmpDir.getAbsolutePath() + File.separator
+ zkDir = tmpDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
zkServer = new ZkTestServer(zkDir);
zkServer.run();
@@ -103,7 +104,7 @@
zkClient = new SolrZkClient(ZOO_KEEPER_ADDRESS, AbstractZkTestCase.TIMEOUT);
- zkClient.makePath("/collections/collection1/config=collection1");
+ //zkClient.makePath("/collections/collection1/config=collection1");
putConfig(zkClient, config);
putConfig(zkClient, schema);
@@ -119,7 +120,7 @@
}
private static void putConfig(SolrZkClient zkConnection, String name) throws Exception {
- zkConnection.write("/configs/collection1/" + name, new File("solr"
+ zkConnection.setData("/configs/conf1/" + name, new File("solr"
+ File.separator + "conf" + File.separator + name));
}
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Tue Jan 12 14:56:48 2010
@@ -21,6 +21,7 @@
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.core.CoreDescriptor;
/**
* nocommit:
@@ -73,6 +74,7 @@
public void testDistribSearch() throws Exception {
for (int nServers = 3; nServers < 4; nServers++) {
+ printLayout();
createServers(nServers);
RandVal.uniqueValues = new HashSet(); //reset random values
doTest();
@@ -127,6 +129,9 @@
query("q","*:*", "sort",f+" asc");
}
+ h.getCoreContainer().getCore("DEFAULT_CORE").close();
+ CoreDescriptor dcore= new CoreDescriptor( h.getCoreContainer(), "testcore", "testcore");
+ h.getCoreContainer().create(dcore);
// these queries should be exactly ordered and scores should exactly match
query("q","*:*", "sort",i1+" desc");
@@ -227,6 +232,8 @@
query("q","fox duplicate horses", "hl","true", "hl.fl", t1);
query("q","*:*", "rows",100);
}
+
+ super.printLayout();
// Thread.sleep(10000000000L);
}
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/BasicZkTest.java Tue Jan 12 14:56:48 2010
@@ -22,7 +22,6 @@
/**
- * TODO: assert config came from ZooKeeper
*
*/
public class BasicZkTest extends AbstractZkTestCase {
@@ -81,7 +80,14 @@
assertU(a, a);
}
assertU(commit());
-
+
+ zkServer.shutdown();
+ Thread.sleep(300);
+ // try a reconnect
+
+ zkServer = new ZkTestServer(zkDir);
+ zkServer.run();
+
// test maxint
assertQ(req("q", "id:[100 TO 110]", "rows", "2147483647"),
"//*[@numFound='4']");
@@ -102,6 +108,6 @@
assertQ(req("id:[100 TO 110]"), "//*[@numFound='0']");
//nocommit
- System.out.println("search nodes:" + h.getCoreContainer().getZooKeeperController().getSearchNodes());
+ System.out.println("search nodes:" + h.getCoreContainer().getZooKeeperController().getSearchNodes("DEFAULT_CORE"));
}
}
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkControllerTest.java Tue Jan 12 14:56:48 2010
@@ -57,6 +57,7 @@
+ "zookeeper/server1/data";
ZkTestServer server = null;
SolrZkClient zkClient = null;
+ ZkController zkController = null;
try {
server = new ZkTestServer(zkDir);
server.run();
@@ -79,10 +80,10 @@
zkClient.printLayoutToStdOut();
}
- ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS,
- "collection1", "localhost", "8983", "/solr", TIMEOUT);
+ zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
+ "localhost", "8983", "/solr");
Map<String,ShardInfoList> shardInfoMap = zkController
- .readShardInfo(shardsPath);
+ .readShardsNode(shardsPath);
assertTrue(shardInfoMap.size() > 0);
Set<Entry<String,ShardInfoList>> entries = shardInfoMap.entrySet();
@@ -117,6 +118,9 @@
if (server != null) {
server.shutdown();
}
+ if(zkController != null) {
+ zkController.close();
+ }
}
}
@@ -140,15 +144,43 @@
zkClient.printLayoutToStdOut();
}
- ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS,
- "collection1", "localhost", "8983", "/solr", TIMEOUT);
+ ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
+ "localhost", "8983", "/solr");
String configName = zkController.readConfigName(COLLECTION_NAME);
assertEquals(configName, actualConfigName);
+
+ // nocommit : close in finally
+ zkController.close();
zkClient.close();
server.shutdown();
}
+
+ public void testUploadToCloud() throws Exception {
+ String zkDir = tmpDir.getAbsolutePath() + File.separator
+ + "zookeeper/server1/data";
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+ server.run();
+
+ AbstractZkTestCase.makeSolrZkNode();
+
+ ZkController zkController = new ZkController(ZOO_KEEPER_ADDRESS, TIMEOUT,
+ "localhost", "8983", "/solr");
+
+
+ zkController.uploadDirToCloud(new File("solr/conf"), ZkController.CONFIGS_ZKNODE + "/config1");
+
+ if (DEBUG) {
+ zkController.printLayoutToStdOut();
+ }
+
+ // nocommit close in finally
+ zkController.close();
+ server.shutdown();
+
+ }
private void addShardToZk(SolrZkClient zkClient, String shardsPath,
String url, String shardList) throws IOException, KeeperException,
@@ -160,7 +192,7 @@
props.put(CollectionInfo.SHARD_LIST_PROP, shardList);
props.store(baos, ZkController.PROPS_DESC);
- zkClient.create(shardsPath + ZkController.NODE_ZKPREFIX,
+ zkClient.create(shardsPath + ZkController.CORE_ZKPREFIX,
baos.toByteArray(), CreateMode.EPHEMERAL_SEQUENTIAL);
}
Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java?rev=898348&r1=898347&r2=898348&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/ZkSolrClientTest.java Tue Jan 12 14:56:48 2010
@@ -18,24 +18,51 @@
*/
import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
public class ZkSolrClientTest extends TestCase {
protected File tmpDir = new File(System.getProperty("java.io.tmpdir")
+ System.getProperty("file.separator") + getClass().getName() + "-"
+ System.currentTimeMillis());
- public void testBasic() throws Exception {
+ public void testConnect() throws Exception {
+ String zkDir = tmpDir.getAbsolutePath() + File.separator
+ + "zookeeper/server1/data";
+ ZkTestServer server = null;
+
+ server = new ZkTestServer(zkDir);
+ server.run();
+
+ SolrZkClient zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_ADDRESS,
+ AbstractZkTestCase.TIMEOUT);
+
+ zkClient.close();
+ server.shutdown();
+ }
+
+ public void testMakeRootNode() throws Exception {
+ String zkDir = tmpDir.getAbsolutePath() + File.separator
+ + "zookeeper/server1/data";
+ ZkTestServer server = null;
+
+ server = new ZkTestServer(zkDir);
+ server.run();
+
+ AbstractZkTestCase.makeSolrZkNode();
+
+ SolrZkClient zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_SERVER,
+ AbstractZkTestCase.TIMEOUT);
+
+ assertTrue(zkClient.exists("/solr"));
+
+ zkClient.close();
+ server.shutdown();
+ }
+
+ public void testReconnect() throws Exception {
String zkDir = tmpDir.getAbsolutePath() + File.separator
+ "zookeeper/server1/data";
ZkTestServer server = null;
@@ -47,40 +74,7 @@
AbstractZkTestCase.makeSolrZkNode();
zkClient = new SolrZkClient(AbstractZkTestCase.ZOO_KEEPER_ADDRESS,
- AbstractZkTestCase.TIMEOUT, new ZkClientConnectionStrategy() {
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- @Override
- public void reconnect(final String serverAddress, final int zkClientTimeout,
- final Watcher watcher, final ZkUpdate updater) throws IOException {
- System.out.println("reconnecting");
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- // nocommit
- System.out.println("Attempting the connect...");
- try {
- updater.update(new ZooKeeper(serverAddress, zkClientTimeout, watcher));
- // nocommit
- System.out.println("Connect done");
- } catch (Exception e) {
- // nocommit
- e.printStackTrace();
- System.out.println("failed reconnect");
- }
- executor.shutdownNow();
-
- }
- }, 0, 400, TimeUnit.MILLISECONDS);
-
- }
-
- @Override
- public void connect(String zkServerAddress, int zkClientTimeout,
- Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
- System.out.println("connecting");
- updater.update(new ZooKeeper(zkServerAddress, zkClientTimeout, watcher));
-
- }
- });
+ AbstractZkTestCase.TIMEOUT);
String shardsPath = "/collections/collection1/shards";
zkClient.makePath(shardsPath);
@@ -103,7 +97,9 @@
server = new ZkTestServer(zkDir);
server.run();
- Thread.sleep(80);
+ // wait for reconnect
+ Thread.sleep(1000);
+
zkClient.makePath("collections/collection1/config=collection3");
zkClient.printLayoutToStdOut();
@@ -114,6 +110,7 @@
} catch(Exception e) {
// nocommit
e.printStackTrace();
+ throw e;
} finally {
if (zkClient != null) {