You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2010/10/13 19:01:33 UTC
svn commit: r1022188 [2/4] - in /lucene/dev/trunk/solr: ./ lib/
src/common/org/apache/solr/common/cloud/
src/common/org/apache/solr/common/params/ src/java/org/apache/solr/cloud/
src/java/org/apache/solr/core/ src/java/org/apache/solr/handler/admin/ sr...
Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,659 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handle ZooKeeper interactions.
+ *
+ * notes: loads everything on init, creates what's not there - further updates
+ * are prompted with Watches.
+ *
+ * TODO: exceptions during shutdown on attempts to update cloud state
+ *
+ */
+public final class ZkController {
+
+ private static Logger log = LoggerFactory.getLogger(ZkController.class);
+
+ static final String NEWL = System.getProperty("line.separator");
+
+
+ private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
+ private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
+
+
+ // package private for tests
+
+ static final String CONFIGS_ZKNODE = "/configs";
+
+ public final static String COLLECTION_PARAM_PREFIX="collection.";
+ public final static String CONFIGNAME_PROP="configName";
+
+ private SolrZkClient zkClient;
+
+ private ZkStateReader zkStateReader;
+
+ private String zkServerAddress;
+
+ private String localHostPort;
+ private String localHostContext;
+ private String localHostName;
+ private String localHost;
+
+ private String hostName;
+
+ /**
+ * @param zkServerAddress ZooKeeper server host address
+ * @param zkClientTimeout
+ * @param zkClientConnectTimeout
+ * @param localHost
+ * @param locaHostPort
+ * @param localHostContext
+ * @throws InterruptedException
+ * @throws TimeoutException
+ * @throws IOException
+ */
+ public ZkController(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
+ String localHostContext) throws InterruptedException,
+ TimeoutException, IOException {
+ this.zkServerAddress = zkServerAddress;
+ this.localHostPort = locaHostPort;
+ this.localHostContext = localHostContext;
+ this.localHost = localHost;
+
+ zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
+ // on reconnect, reload cloud info
+ new OnReconnect() {
+
+ public void command() {
+ try {
+ zkStateReader.makeCollectionsNodeWatches();
+ zkStateReader.makeShardsWatches(true);
+ createEphemeralLiveNode();
+ zkStateReader.updateCloudState(false);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }
+ });
+ zkStateReader = new ZkStateReader(zkClient);
+ init();
+ }
+
+ /**
+ * @param shardId
+ * @param collection
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
+
+ String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
+
+ try {
+
+ // shards node
+ if (!zkClient.exists(shardsZkPath)) {
+ if (log.isInfoEnabled()) {
+ log.info("creating zk shards node:" + shardsZkPath);
+ }
+ // makes shards zkNode if it doesn't exist
+ zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
+
+ // TODO: consider how these notifications are being done
+ // ping that there is a new shardId
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+
+ }
+ } catch (KeeperException e) {
+ // its okay if another beats us creating the node
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ }
+
+ }
+
+ /**
+ * Closes the underlying ZooKeeper client.
+ */
+ public void close() {
+ try {
+ zkClient.close();
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
+
+ /**
+ * @param collection
+ * @param fileName
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public boolean configFileExists(String collection, String fileName)
+ throws KeeperException, InterruptedException {
+ Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null);
+ return stat != null;
+ }
+
+ /**
+ * @return information about the cluster from ZooKeeper
+ */
+ public CloudState getCloudState() {
+ return zkStateReader.getCloudState();
+ }
+
+ /**
+ * @param zkConfigName
+ * @param fileName
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] getConfigFileData(String zkConfigName, String fileName)
+ throws KeeperException, InterruptedException {
+ String zkPath = CONFIGS_ZKNODE + "/" + zkConfigName + "/" + fileName;
+ byte[] bytes = zkClient.getData(zkPath, null, null);
+ if (bytes == null) {
+ log.error("Config file contains no data:" + zkPath);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "Config file contains no data:" + zkPath);
+ }
+
+ return bytes;
+ }
+
+ // TODO: consider how this is done
+ private String getHostAddress() throws IOException {
+
+ if (localHost == null) {
+ localHost = "http://" + InetAddress.getLocalHost().getHostName();
+ } else {
+ Matcher m = URL_PREFIX.matcher(localHost);
+ if (m.matches()) {
+ String prefix = m.group(1);
+ localHost = prefix + localHost;
+ } else {
+ localHost = "http://" + localHost;
+ }
+ }
+
+ return localHost;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public SolrZkClient getZkClient() {
+ return zkClient;
+ }
+
+ /**
+ * @return
+ */
+ public String getZkServerAddress() {
+ return zkServerAddress;
+ }
+
+ private void init() {
+
+ try {
+ localHostName = getHostAddress();
+ Matcher m = URL_POST.matcher(localHostName);
+
+ if (m.matches()) {
+ hostName = m.group(1);
+ } else {
+ log.error("Unrecognized host:" + localHostName);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unrecognized host:" + localHostName);
+ }
+
+ // makes nodes zkNode
+ try {
+ zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
+ } catch (KeeperException e) {
+ // its okay if another beats us creating the node
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
+
+ createEphemeralLiveNode();
+ setUpCollectionsNode();
+ zkStateReader.makeCollectionsNodeWatches();
+
+ } catch (IOException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Can't create ZooKeeperController", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }
+
+ private void createEphemeralLiveNode() throws KeeperException,
+ InterruptedException {
+ String nodeName = getNodeName();
+ String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+ log.info("Register node as live in ZooKeeper:" + nodePath);
+ Watcher liveNodeWatcher = new Watcher() {
+
+ public void process(WatchedEvent event) {
+ try {
+ log.info("Updating live nodes:" + zkClient);
+ try {
+ zkStateReader.updateLiveNodes();
+ } finally {
+ // re-make watch
+
+ String path = event.getPath();
+ if(path == null) {
+ // on shutdown, it appears this can trigger with a null path
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ zkClient.getChildren(event.getPath(), this);
+ }
+ } catch (KeeperException e) {
+ if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }
+
+ };
+ try {
+ boolean nodeDeleted = true;
+ try {
+ // we attempt a delete in the case of a quick server bounce -
+ // if there was not a graceful shutdown, the node may exist
+ // until expiration timeout - so a node won't be created here because
+ // it exists, but eventually the node will be removed. So delete
+ // in case it exists and create a new node.
+ zkClient.delete(nodePath, -1);
+ } catch (KeeperException.NoNodeException e) {
+ // fine if there is nothing to delete
+ // TODO: annoying that ZK logs a warning on us
+ nodeDeleted = false;
+ }
+ if (nodeDeleted) {
+ log
+ .info("Found a previous node that still exists while trying to register a new live node "
+ + nodePath + " - removing existing node to create another.");
+ }
+ zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+ } catch (KeeperException e) {
+ // its okay if the node already exists
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ }
+ zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
+ }
+
+ public String getNodeName() {
+ return hostName + ":" + localHostPort + "_"+ localHostContext;
+ }
+
+ /**
+ * @param path
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public boolean pathExists(String path) throws KeeperException,
+ InterruptedException {
+ return zkClient.exists(path);
+ }
+
+ /**
+ * @param collection
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public String readConfigName(String collection) throws KeeperException,
+ InterruptedException, IOException {
+
+ String configName = null;
+
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
+ if (log.isInfoEnabled()) {
+ log.info("Load collection config from:" + path);
+ }
+ byte[] data = zkClient.getData(path, null, null);
+ ZkNodeProps props = new ZkNodeProps();
+
+ if(data != null) {
+ props.load(data);
+ configName = props.get(CONFIGNAME_PROP);
+ }
+
+ if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName)) {
+ log.error("Specified config does not exist in ZooKeeper:" + configName);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "Specified config does not exist in ZooKeeper:" + configName);
+ }
+
+ return configName;
+ }
+
+ /**
+ * Register shard with ZooKeeper.
+ *
+ * @param coreName
+ * @param cloudDesc
+ * @param forcePropsUpdate update solr.xml core props even if the shard is already registered
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void register(String coreName, CloudDescriptor cloudDesc, boolean forcePropsUpdate) throws IOException,
+ KeeperException, InterruptedException {
+ String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
+ + "/" + coreName;
+
+ String collection = cloudDesc.getCollectionName();
+
+ String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
+
+ boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
+
+ if(shardZkNodeAlreadyExists && !forcePropsUpdate) {
+ return;
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Register shard - core:" + coreName + " address:"
+ + shardUrl);
+ }
+
+ ZkNodeProps props = new ZkNodeProps();
+ props.put(ZkStateReader.URL_PROP, shardUrl);
+
+ props.put(ZkStateReader.NODE_NAME, getNodeName());
+
+ byte[] bytes = props.store();
+
+ String shardZkNodeName = getNodeName() + "_" + coreName;
+
+ if(shardZkNodeAlreadyExists && forcePropsUpdate) {
+ zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
+ // tell everyone to update cloud info
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+ } else {
+ addZkShardsNode(cloudDesc.getShardId(), collection);
+ try {
+ zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
+ CreateMode.PERSISTENT);
+ // tell everyone to update cloud info
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+ } catch (KeeperException e) {
+ // its okay if the node already exists
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ // for some reason the shard already exists, though it didn't when we
+ // started registration - just return
+ return;
+ }
+ }
+
+ }
+
+ /**
+ * @param coreName
+ * @param cloudDesc
+ */
+ public void unregister(String coreName, CloudDescriptor cloudDesc) {
+ // TODO : perhaps mark the core down in zk?
+ }
+
+ /**
+ * @param dir
+ * @param zkPath
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void uploadToZK(File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
+ File[] files = dir.listFiles();
+ for(File file : files) {
+ if (!file.getName().startsWith(".")) {
+ if (!file.isDirectory()) {
+ zkClient.setData(zkPath + "/" + file.getName(), file);
+ } else {
+ uploadToZK(file, zkPath + "/" + file.getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * @param dir
+ * @param configName
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void uploadConfigDir(File dir, String configName) throws IOException, KeeperException, InterruptedException {
+ uploadToZK(dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+ }
+
+ // convenience for testing
+ void printLayoutToStdOut() throws KeeperException, InterruptedException {
+ zkClient.printLayoutToStdOut();
+ }
+
+ private void setUpCollectionsNode() throws KeeperException, InterruptedException {
+ try {
+ if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
+ if (log.isInfoEnabled()) {
+ log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
+ }
+ // makes collections zkNode if it doesn't exist
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
+ }
+ } catch (KeeperException e) {
+ // its okay if another beats us creating the node
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }
+
+ public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
+ String collection = cd.getCollectionName();
+
+ log.info("Check for collection zkNode:" + collection);
+ String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
+
+ try {
+ if(!zkClient.exists(collectionPath)) {
+ log.info("Creating collection in ZooKeeper:" + collection);
+ SolrParams params = cd.getParams();
+
+ try {
+ ZkNodeProps collectionProps = new ZkNodeProps();
+ // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
+ String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, "configuration1");
+
+ // params passed in - currently only done via core admin (create core commmand).
+ if (params != null) {
+ Iterator<String> iter = params.getParameterNamesIterator();
+ while (iter.hasNext()) {
+ String paramName = iter.next();
+ if (paramName.startsWith(COLLECTION_PARAM_PREFIX)) {
+ collectionProps.put(paramName.substring(COLLECTION_PARAM_PREFIX.length()), params.get(paramName));
+ }
+ }
+
+ // if the config name wasn't passed in, use the default
+ if (!collectionProps.containsKey(CONFIGNAME_PROP))
+ collectionProps.put(CONFIGNAME_PROP, defaultConfigName);
+
+ } else if(System.getProperty("bootstrap_confdir") != null) {
+ // if we are bootstrapping a collection, default the config for
+ // a new collection to the collection we are bootstrapping
+ log.info("Setting config for collection:" + collection + " to " + defaultConfigName);
+
+ Properties sysProps = System.getProperties();
+ for (String sprop : System.getProperties().stringPropertyNames()) {
+ if (sprop.startsWith(COLLECTION_PARAM_PREFIX)) {
+ collectionProps.put(sprop.substring(COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));
+ }
+ }
+
+ // if the config name wasn't passed in, use the default
+ if (!collectionProps.containsKey(CONFIGNAME_PROP))
+ collectionProps.put(CONFIGNAME_PROP, defaultConfigName);
+
+ } else {
+ // check for configName
+ log.info("Looking for collection configName");
+ int retry = 1;
+ for (; retry < 6; retry++) {
+ if (zkClient.exists(collectionPath)) {
+ collectionProps = new ZkNodeProps();
+ collectionProps.load(zkClient.getData(collectionPath, null, null));
+ if (collectionProps.containsKey(CONFIGNAME_PROP)) {
+ break;
+ }
+ }
+ log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
+ Thread.sleep(2000);
+ }
+ if (retry == 6) {
+ log.error("Could not find configName for collection " + collection);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Could not find configName for collection " + collection);
+ }
+ }
+
+ zkClient.makePath(collectionPath, collectionProps.store(), CreateMode.PERSISTENT, null, true);
+
+ // ping that there is a new collection
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+ } catch (KeeperException e) {
+ // its okay if the node already exists
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ }
+ } else {
+ log.info("Collection zkNode exists");
+ }
+
+ } catch (KeeperException e) {
+ // its okay if another beats us creating the node
+ if (e.code() != KeeperException.Code.NODEEXISTS) {
+ throw e;
+ }
+ }
+
+ }
+
+ public ZkStateReader getZkStateReader() {
+ return zkStateReader;
+ }
+
+}
Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,120 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * ResourceLoader that works with ZooKeeper.
+ *
+ */
+public class ZkSolrResourceLoader extends SolrResourceLoader {
+
+ private final String collectionZkPath;
+ private ZkController zkController;
+
+ public ZkSolrResourceLoader(String instanceDir, String collection,
+ ZkController zooKeeperController) {
+ super(instanceDir);
+ this.zkController = zooKeeperController;
+ collectionZkPath = ZkController.CONFIGS_ZKNODE + "/" + collection;
+ }
+
+ /**
+ * <p>
+ * This loader will first attempt to load resources from ZooKeeper, but if not found
+ * will delegate to the context classloader when possible,
+ * otherwise it will attempt to resolve resources using any jar files found in
+ * the "lib/" directory in the specified instance directory.
+ * <p>
+ */
+ public ZkSolrResourceLoader(String instanceDir, String collection, ClassLoader parent,
+ Properties coreProperties, ZkController zooKeeperController) {
+ super(instanceDir, parent, coreProperties);
+ this.zkController = zooKeeperController;
+ collectionZkPath = ZkController.CONFIGS_ZKNODE + "/" + collection;
+ }
+
+ /**
+ * Opens any resource by its name. By default, this will look in multiple
+ * locations to load the resource: $configDir/$resource from ZooKeeper.
+ * It will look for it in any jar
+ * accessible through the class loader if it cannot be found in ZooKeeper.
+ * Override this method to customize loading resources.
+ *
+ * @return the stream for the named resource
+ */
+ public InputStream openResource(String resource) {
+ InputStream is = null;
+ String file = collectionZkPath + "/" + resource;
+ try {
+ if (zkController.pathExists(file)) {
+ byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null);
+ return new ByteArrayInputStream(bytes);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Error opening " + file, e);
+ }
+ try {
+ // delegate to the class loader (looking into $INSTANCE_DIR/lib jars)
+ is = classLoader.getResourceAsStream(resource);
+ } catch (Exception e) {
+ throw new RuntimeException("Error opening " + resource, e);
+ }
+ if (is == null) {
+ throw new RuntimeException("Can't find resource '" + resource
+ + "' in classpath or '" + collectionZkPath + "', cwd="
+ + System.getProperty("user.dir"));
+ }
+ return is;
+ }
+
+ public String getConfigDir() {
+ throw new ZooKeeperException(
+ ErrorCode.SERVER_ERROR,
+ "ZkSolrResourceLoader does not support getConfigDir() - likely, what you are trying to do is not supported in ZooKeeper mode");
+ }
+
+ public String[] listConfigDir() {
+ List<String> list;
+ try {
+ list = zkController.getZkClient().getChildren(collectionZkPath, null);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ return list.toArray(new String[0]);
+ }
+
+}
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java Wed Oct 13 17:01:13 2010
@@ -21,6 +21,7 @@ import java.io.*;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
import java.text.SimpleDateFormat;
import org.slf4j.Logger;
@@ -31,13 +32,19 @@ import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPath;
import javax.xml.xpath.XPathExpressionException;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.SolrZkServer;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.DOMUtil;
import org.apache.solr.common.util.XML;
import org.apache.solr.common.util.FileUtils;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.schema.IndexSchema;
+import org.apache.zookeeper.KeeperException;
import org.apache.commons.io.IOUtils;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
@@ -58,6 +65,9 @@ public class CoreContainer
protected boolean persistent = false;
protected String adminPath = null;
protected String managementPath = null;
+ protected String hostPort;
+ protected String hostContext;
+ protected String host;
protected CoreAdminHandler coreAdminHandler = null;
protected File configFile = null;
protected String libDir = null;
@@ -68,12 +78,86 @@ public class CoreContainer
protected String adminHandler;
protected boolean shareSchema;
protected String solrHome;
+ @Deprecated
protected String solrConfigFilenameOverride;
private String defaultCoreName = "";
+ private ZkController zkController;
+ private SolrZkServer zkServer;
+
+ private String zkHost;
public CoreContainer() {
solrHome = SolrResourceLoader.locateSolrHome();
}
+
+ private void initZooKeeper(String zkHost, int zkClientTimeout) {
+ // if zkHost sys property is not set, we are not using ZooKeeper
+ String zookeeperHost;
+ if(zkHost == null) {
+ zookeeperHost = System.getProperty("zkHost");
+ } else {
+ zookeeperHost = zkHost;
+ }
+
+ String zkRun = System.getProperty("zkRun");
+
+ if (zkRun == null && zookeeperHost == null)
+ return; // not in zk mode
+
+ zkServer = new SolrZkServer(zkRun, zookeeperHost, solrHome, hostPort);
+ zkServer.parseConfig();
+ zkServer.start();
+
+ // set client from server config if not already set
+ if (zookeeperHost == null) {
+ zookeeperHost = zkServer.getClientString();
+ }
+
+ int zkClientConnectTimeout = 5000;
+
+ if (zookeeperHost != null) {
+ // we are ZooKeeper enabled
+ try {
+ // If this is an ensemble, allow for a long connect time for other servers to come up
+ if (zkRun != null && zkServer.getServers().size() > 1) {
+ zkClientConnectTimeout = 24 * 60 * 60 * 1000; // 1 day for embedded ensemble
+ log.info("Zookeeper client=" + zookeeperHost + " Waiting for a quorum.");
+ } else {
+ log.info("Zookeeper client=" + zookeeperHost);
+ }
+ zkController = new ZkController(zookeeperHost, zkClientTimeout, zkClientConnectTimeout, host, hostPort, hostContext);
+
+ String confDir = System.getProperty("bootstrap_confdir");
+ if(confDir != null) {
+ File dir = new File(confDir);
+ if(!dir.isDirectory()) {
+ throw new IllegalArgumentException("bootstrap_confdir must be a directory of configuration files");
+ }
+ String confName = System.getProperty(ZkController.COLLECTION_PARAM_PREFIX+ZkController.CONFIGNAME_PROP, "configuration1");
+ zkController.uploadConfigDir(dir, confName);
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (TimeoutException e) {
+ log.error("Could not connect to ZooKeeper", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
+
+ }
public Properties getContainerProperties() {
return containerProperties;
@@ -82,6 +166,7 @@ public class CoreContainer
// Helper class to initialize the CoreContainer
public static class Initializer {
protected String solrConfigFilename = null;
+ protected String dataDir = null; // override datadir for single core mode
/**
* @deprecated all cores now abort on configuration error regardless of configuration
@@ -106,7 +191,8 @@ public class CoreContainer
public String getSolrConfigFilename() {
return solrConfigFilename;
}
- @Deprecated
+
+ @Deprecated
public void setSolrConfigFilename(String solrConfigFilename) {
this.solrConfigFilename = solrConfigFilename;
}
@@ -116,14 +202,16 @@ public class CoreContainer
ParserConfigurationException, SAXException {
CoreContainer cores = null;
String solrHome = SolrResourceLoader.locateSolrHome();
+ // TODO : fix broken logic confusing solr.xml with solrconfig.xml
File fconf = new File(solrHome, solrConfigFilename == null ? "solr.xml"
: solrConfigFilename);
log.info("looking for solr.xml: " + fconf.getAbsolutePath());
cores = new CoreContainer();
- cores.solrConfigFilenameOverride = solrConfigFilename;
+
if (fconf.exists()) {
cores.load(solrHome, fconf);
} else {
+ log.info("no solr.xml file found - using default");
cores.load(solrHome, new ByteArrayInputStream(DEF_SOLR_XML.getBytes()));
cores.configFile = fconf;
}
@@ -219,15 +307,29 @@ public class CoreContainer
if(dcoreName != null) {
defaultCoreName = dcoreName;
}
- persistent = cfg.getBool( "solr/@persistent", false );
- libDir = cfg.get( "solr/@sharedLib", null);
- adminPath = cfg.get( "solr/cores/@adminPath", null );
- shareSchema = cfg.getBool("solr/cores/@shareSchema", false );
+ persistent = cfg.getBool("solr/@persistent", false);
+ libDir = cfg.get("solr/@sharedLib", null);
+ zkHost = cfg.get("solr/@zkHost" , null);
+ adminPath = cfg.get("solr/cores/@adminPath", null);
+ shareSchema = cfg.getBool("solr/cores/@shareSchema", false);
+ int zkClientTimeout = cfg.getInt("solr/cores/@zkClientTimeout", 10000);
+
+ hostPort = System.getProperty("hostPort");
+ if (hostPort == null) {
+ hostPort = cfg.get("solr/cores/@hostPort", "8983");
+ }
+
+ hostContext = cfg.get("solr/cores/@hostContext", "solr");
+ host = cfg.get("solr/cores/@host", null);
+
if(shareSchema){
indexSchemaCache = new ConcurrentHashMap<String ,IndexSchema>();
}
adminHandler = cfg.get("solr/cores/@adminHandler", null );
managementPath = cfg.get("solr/cores/@managementPath", null );
+
+ zkClientTimeout = Integer.parseInt(System.getProperty("zkClientTimeout", Integer.toString(zkClientTimeout)));
+ initZooKeeper(zkHost, zkClientTimeout);
if (libDir != null) {
File f = FileUtils.resolvePath(new File(dir), libDir);
@@ -249,25 +351,9 @@ public class CoreContainer
SolrConfig.severeErrors.add(e);
SolrException.logOnce(log,null,e);
}
-
- // before looping over each core, let's check the names and fail
- // fast if the same one is reused multiple times.
- { // local scope, won't need these vars again
- NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core/@name",
- XPathConstants.NODESET);
- Set<String> names = new HashSet<String>();
- for (int i=0; i<nodes.getLength(); i++) {
- String name = DOMUtil.getText(nodes.item(i));
- if (names.contains(name)) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Multiple cores found with same name: " +
- name);
- }
- names.add(name);
- }
- }
NodeList nodes = (NodeList)cfg.evaluate("solr/cores/core", XPathConstants.NODESET);
+ boolean defaultCoreFound = false;
for (int i=0; i<nodes.getLength(); i++) {
Node node = nodes.item(i);
try {
@@ -282,12 +368,12 @@ public class CoreContainer
// be mapped to this.
name="";
}
-
CoreDescriptor p = new CoreDescriptor(this, name, DOMUtil.getAttr(node, "instanceDir", null));
// deal with optional settings
String opt = DOMUtil.getAttr(node, "config", null);
- if(solrConfigFilenameOverride != null && name.equals("")) {
+
+ if(solrConfigFilenameOverride != null) {
p.setConfigName(solrConfigFilenameOverride);
} else if (opt != null) {
p.setConfigName(opt);
@@ -296,6 +382,16 @@ public class CoreContainer
if (opt != null) {
p.setSchemaName(opt);
}
+ if (zkController != null) {
+ opt = DOMUtil.getAttr(node, "shard", null);
+ if (opt != null && opt.length() > 0) {
+ p.getCloudDescriptor().setShardId(opt);
+ }
+ opt = DOMUtil.getAttr(node, "collection", null);
+ if (opt != null) {
+ p.getCloudDescriptor().setCollectionName(opt);
+ }
+ }
opt = DOMUtil.getAttr(node, "properties", null);
if (opt != null) {
p.setPropertiesName(opt);
@@ -315,13 +411,35 @@ public class CoreContainer
SolrException.logOnce(log,null,ex);
}
}
- }
-
- finally {
+ } finally {
if (cfgis != null) {
try { cfgis.close(); } catch (Exception xany) {}
}
}
+
+
+ if(zkController != null) {
+ try {
+ synchronized (zkController.getZkStateReader().getUpdateLock()) {
+ zkController.getZkStateReader().makeShardZkNodeWatches(false);
+ zkController.getZkStateReader().updateCloudState(true);
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
}
private Properties readProperties(Config cfg, Node node) throws XPathExpressionException {
@@ -346,6 +464,12 @@ public class CoreContainer
}
cores.clear();
} finally {
+ if(zkController != null) {
+ zkController.close();
+ }
+ if (zkServer != null) {
+ zkServer.stop();
+ }
isShutDown = true;
}
}
@@ -385,6 +509,24 @@ public class CoreContainer
core.setName(name);
}
+ if (zkController != null) {
+ try {
+ zkController.register(core.getName(), core.getCoreDescriptor().getCloudDescriptor(), true);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
if( old == null || old == core) {
log.info( "registering core: "+name );
@@ -427,32 +569,88 @@ public class CoreContainer
String instanceDir = idir.getPath();
// Initialize the solr config
- SolrResourceLoader solrLoader = new SolrResourceLoader(instanceDir, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()));
- SolrConfig config = new SolrConfig(solrLoader, dcore.getConfigName(), null);
-
+ SolrResourceLoader solrLoader = null;
+
+ SolrConfig config = null;
+ String zkConfigName = null;
+ if(zkController == null) {
+ solrLoader = new SolrResourceLoader(instanceDir, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()));
+ config = new SolrConfig(solrLoader, dcore.getConfigName(), null);
+ } else {
+ try {
+ String collection = dcore.getCloudDescriptor().getCollectionName();
+ zkController.createCollectionZkNode(dcore.getCloudDescriptor());
+ // zkController.createCollectionZkNode(collection);
+ zkConfigName = zkController.readConfigName(collection);
+ if (zkConfigName == null) {
+ log.error("Could not find config name for collection:" + collection);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "Could not find config name for collection:" + collection);
+ }
+ solrLoader = new ZkSolrResourceLoader(instanceDir, zkConfigName, libLoader, getCoreProps(instanceDir, dcore.getPropertiesName(),dcore.getCoreProperties()), zkController);
+ config = getSolrConfigFromZk(zkConfigName, dcore.getConfigName(), solrLoader);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
+
IndexSchema schema = null;
- if(indexSchemaCache != null){
- //schema sharing is enabled. so check if it already is loaded
- File schemaFile = new File(dcore.getSchemaName());
- if (!schemaFile.isAbsolute()) {
- schemaFile = new File(solrLoader.getInstanceDir() + "conf" + File.separator + dcore.getSchemaName());
- }
- if(schemaFile. exists()){
- String key = schemaFile.getAbsolutePath()+":"+new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(new Date(schemaFile.lastModified()));
- schema = indexSchemaCache.get(key);
- if(schema == null){
- log.info("creating new schema object for core: " + dcore.name);
- schema = new IndexSchema(config, dcore.getSchemaName(), null);
- indexSchemaCache.put(key,schema);
- } else {
- log.info("re-using schema object for core: " + dcore.name);
+ if (indexSchemaCache != null) {
+ if (zkController != null) {
+ File schemaFile = new File(dcore.getSchemaName());
+ if (!schemaFile.isAbsolute()) {
+ schemaFile = new File(solrLoader.getInstanceDir() + "conf"
+ + File.separator + dcore.getSchemaName());
+ }
+ if (schemaFile.exists()) {
+ String key = schemaFile.getAbsolutePath()
+ + ":"
+ + new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(new Date(
+ schemaFile.lastModified()));
+ schema = indexSchemaCache.get(key);
+ if (schema == null) {
+ log.info("creating new schema object for core: " + dcore.name);
+ schema = new IndexSchema(config, dcore.getSchemaName(), null);
+ indexSchemaCache.put(key, schema);
+ } else {
+ log.info("re-using schema object for core: " + dcore.name);
+ }
}
+ } else {
+ // TODO: handle caching from ZooKeeper - perhaps using ZooKeepers versioning
+ // Don't like this cache though - how does it empty as last modified changes?
}
}
if(schema == null){
- schema = new IndexSchema(config, dcore.getSchemaName(), null);
+ if(zkController != null) {
+ try {
+ schema = getSchemaFromZk(zkConfigName, dcore.getSchemaName(), config, solrLoader);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ } else {
+ schema = new IndexSchema(config, dcore.getSchemaName(), null);
+ }
}
- SolrCore core = new SolrCore(dcore.getName(), null, config, schema, dcore);
+ String dataDir = null;
+
+ SolrCore core = new SolrCore(dcore.getName(), dataDir, config, schema, dcore);
return core;
}
@@ -744,6 +942,18 @@ public class CoreContainer
}
opt = dcore.dataDir;
if (opt != null) writeAttribute(w,"dataDir",opt);
+
+ CloudDescriptor cd = dcore.getCloudDescriptor();
+ if (cd != null) {
+ opt = cd.getShardId();
+ if (opt != null)
+ writeAttribute(w,"shard",opt);
+ // only write out the collection name if it's not the default (the core name)
+ opt = cd.getCollectionName();
+ if (opt != null && !opt.equals(dcore.name))
+ writeAttribute(w,"collection",opt);
+ }
+
if (dcore.getCoreProperties() == null || dcore.getCoreProperties().isEmpty())
w.write("/>\n"); // core
else {
@@ -801,6 +1011,37 @@ public class CoreContainer
public String getSolrHome() {
return solrHome;
}
+
+ public boolean isZooKeeperAware() {
+ return zkController != null;
+ }
+
+ public ZkController getZkController() {
+ return zkController;
+ }
+
+ private SolrConfig getSolrConfigFromZk(String zkConfigName, String solrConfigFileName,
+ SolrResourceLoader resourceLoader) throws IOException,
+ ParserConfigurationException, SAXException, KeeperException,
+ InterruptedException {
+ byte[] config = zkController.getConfigFileData(zkConfigName, solrConfigFileName);
+ InputStream is = new ByteArrayInputStream(config);
+ SolrConfig cfg = solrConfigFileName == null ? new SolrConfig(
+ resourceLoader, SolrConfig.DEFAULT_CONF_FILE, is) : new SolrConfig(
+ resourceLoader, solrConfigFileName, is);
+
+ return cfg;
+ }
+
+ private IndexSchema getSchemaFromZk(String zkConfigName, String schemaName,
+ SolrConfig config, SolrResourceLoader resourceLoader)
+ throws KeeperException, InterruptedException {
+ byte[] configBytes = zkController.getConfigFileData(zkConfigName, schemaName);
+ InputStream is = new ByteArrayInputStream(configBytes);
+ IndexSchema schema = new IndexSchema(config, schemaName, is);
+ return schema;
+ }
+
private static final String DEF_SOLR_XML ="<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n" +
"<solr persistent=\"false\">\n" +
" <cores adminPath=\"/admin/cores\" defaultCoreName=\"" + DEFAULT_DEFAULT_CORE_NAME + "\">\n" +
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java Wed Oct 13 17:01:13 2010
@@ -20,6 +20,8 @@ package org.apache.solr.core;
import java.util.Properties;
import java.io.File;
+import org.apache.solr.cloud.CloudDescriptor;
+
/**
* A Solr core descriptor
*
@@ -34,10 +36,20 @@ public class CoreDescriptor {
protected String schemaName;
private final CoreContainer coreContainer;
private Properties coreProperties;
+
+ private CloudDescriptor cloudDesc;
public CoreDescriptor(CoreContainer coreContainer, String name, String instanceDir) {
this.coreContainer = coreContainer;
this.name = name;
+
+ if(coreContainer.getZkController() != null) {
+ this.cloudDesc = new CloudDescriptor();
+ // cloud collection defaults to core name
+ cloudDesc.setCollectionName(name == "" ? coreContainer.getDefaultCoreName() : name);
+ this.cloudDesc.setShardId(coreContainer.getZkController().getNodeName() + "_" + name);
+ }
+
if (name == null) {
throw new RuntimeException("Core needs a name");
}
@@ -112,6 +124,10 @@ public class CoreDescriptor {
// normalize zero length to null.
if (dataDir != null && dataDir.length()==0) dataDir=null;
}
+
+ public boolean usingDefaultDataDir() {
+ return this.dataDir == null;
+ }
/**@return the core instance directory. */
public String getInstanceDir() {
@@ -171,4 +187,12 @@ public class CoreDescriptor {
this.coreProperties.putAll(coreProperties);
}
}
+
+ public CloudDescriptor getCloudDescriptor() {
+ return cloudDesc;
+ }
+
+ public void setCloudDescriptor(CloudDescriptor cloudDesc) {
+ this.cloudDesc = cloudDesc;
+ }
}
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java Wed Oct 13 17:01:13 2010
@@ -512,7 +512,8 @@ public final class SolrCore implements S
this.setName( name );
resourceLoader = config.getResourceLoader();
if (dataDir == null){
- dataDir = config.getDataDir();
+ // nocommit: why did solrconfig override core descriptor !?
+ if(cd.usingDefaultDataDir()) dataDir = config.getDataDir();
if(dataDir == null) dataDir = cd.getDataDir();
}
@@ -1556,12 +1557,10 @@ public final class SolrCore implements S
// Hide everything...
Set<String> hide = new HashSet<String>();
- File configdir = new File( solrConfig.getResourceLoader().getConfigDir() );
- if( configdir.exists() && configdir.isDirectory() ) {
- for( String file : configdir.list() ) {
- hide.add( file.toUpperCase(Locale.ENGLISH) );
- }
- }
+
+ for (String file : solrConfig.getResourceLoader().listConfigDir()) {
+ hide.add(file.toUpperCase(Locale.ENGLISH));
+ }
// except the "gettable" list
StringTokenizer st = new StringTokenizer( gettable );
@@ -1588,16 +1587,7 @@ public final class SolrCore implements S
"solrconfig.xml uses deprecated <bool name='facet.sort'>. Please "+
"update your config to use <string name='facet.sort'>.");
}
-
- if (!solrConfig.getBool("abortOnConfigurationError",true))
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Setting abortOnConfigurationError==false is no longer supported");
- if (null != solrConfig.getVal("abortOnConfigurationError", false))
- log.warn("The abortOnConfigurationError option is no longer supported "+
- "in solrconfig.xml. Setting it has no effect.");
-
- }
-
+ }
public CoreDescriptor getCoreDescriptor() {
return coreDescriptor;
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java Wed Oct 13 17:01:13 2010
@@ -69,7 +69,7 @@ public class SolrResourceLoader implemen
static final String base = "org.apache" + "." + project;
static final String[] packages = {"","analysis.","schema.","handler.","search.","update.","core.","response.","request.","update.processor.","util.", "spelling.", "handler.component.", "handler.dataimport." };
- private URLClassLoader classLoader;
+ protected URLClassLoader classLoader;
private final String instanceDir;
private String dataDir;
@@ -205,6 +205,15 @@ public class SolrResourceLoader implemen
public static String normalizeDir(String path) {
return ( path != null && (!(path.endsWith("/") || path.endsWith("\\"))) )? path + File.separator : path;
}
+
+ public String[] listConfigDir() {
+ File configdir = new File(getConfigDir());
+ if( configdir.exists() && configdir.isDirectory() ) {
+ return configdir.list();
+ } else {
+ return new String[0];
+ }
+ }
public String getConfigDir() {
return instanceDir + "conf/";
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Wed Oct 13 17:01:13 2010
@@ -17,6 +17,7 @@
package org.apache.solr.handler.admin;
+import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
@@ -228,7 +229,14 @@ public class CoreAdminHandler extends Re
try {
SolrParams params = req.getParams();
String name = params.get(CoreAdminParams.NAME);
- CoreDescriptor dcore = new CoreDescriptor(coreContainer, name, params.get(CoreAdminParams.INSTANCE_DIR));
+
+ String instanceDir = params.get(CoreAdminParams.INSTANCE_DIR);
+ if (instanceDir == null) {
+ // instanceDir = coreContainer.getSolrHome() + "/" + name;
+ instanceDir = name; // bare name is already relative to solr home
+ }
+
+ CoreDescriptor dcore = new CoreDescriptor(coreContainer, name, instanceDir);
// fillup optional parameters
String opts = params.get(CoreAdminParams.CONFIG);
@@ -243,6 +251,19 @@ public class CoreAdminHandler extends Re
if (opts != null)
dcore.setDataDir(opts);
+ CloudDescriptor cd = dcore.getCloudDescriptor();
+ if (cd != null) {
+ cd.setParams(req.getParams());
+
+ opts = params.get(CoreAdminParams.COLLECTION);
+ if (opts != null)
+ cd.setCollectionName(opts);
+
+ opts = params.get(CoreAdminParams.SHARD);
+ if (opts != null)
+ cd.setShardId(opts);
+ }
+
dcore.setCoreProperties(null);
SolrCore core = coreContainer.create(dcore);
coreContainer.register(name, core, false);
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java Wed Oct 13 17:01:13 2010
@@ -22,12 +22,19 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.*;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
@@ -110,11 +117,116 @@ public class QueryComponent extends Sear
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
- // TODO: temporary... this should go in a different component.
+ checkDistributed(rb);
+ }
+
+
+ // TODO: this could go in a different component, or in SearchHandler
+ // check if this is a distributed request and set info on the response builder
+ void checkDistributed(ResponseBuilder rb) {
+ SolrQueryRequest req = rb.req;
+ SolrParams params = req.getParams();
+
+ rb.isDistrib = params.getBool("distrib",false);
String shards = params.get(ShardParams.SHARDS);
- if (shards != null) {
- List<String> lst = StrUtils.splitSmart(shards, ",", true);
- rb.shards = lst.toArray(new String[lst.size()]);
+
+ // for back compat, a shards param with URLs like localhost:8983/solr will mean that this
+ // search is distributed.
+ boolean hasShardURL = shards != null && shards.indexOf('/') > 0;
+ rb.isDistrib = hasShardURL | rb.isDistrib;
+
+ if (rb.isDistrib) {
+ // since the cost of grabbing cloud state is still up in the air, we grab it only
+ // if we need it.
+ CloudState cloudState = null;
+ Map<String,Slice> slices = null;
+ CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
+ CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor();
+ ZkController zkController = coreDescriptor.getCoreContainer().getZkController();
+
+
+ if (shards != null) {
+ List<String> lst = StrUtils.splitSmart(shards, ",", true);
+ rb.shards = lst.toArray(new String[lst.size()]);
+ rb.slices = new String[rb.shards.length];
+
+ if (zkController != null) {
+ // figure out which shards are slices
+ for (int i=0; i<rb.shards.length; i++) {
+ if (rb.shards[i].indexOf('/') < 0) {
+ // this is a logical shard
+ rb.slices[i] = rb.shards[i];
+ rb.shards[i] = null;
+ }
+ }
+ }
+ } else if (zkController != null) {
+ // we weren't provided with a list of slices to query, so find the list that will cover the complete index
+
+ cloudState = zkController.getCloudState();
+
+ // TODO: check "collection" for which collection(s) to search.. but for now, just default
+ // to the collection for this core.
+ // This can be more efficient... we only record the name, even though we have the
+ // shard info we need in the next step of mapping slice->shards
+ slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+ rb.slices = slices.keySet().toArray(new String[slices.size()]);
+ rb.shards = new String[rb.slices.length];
+
+ /***
+ rb.slices = new String[slices.size()];
+ for (int i=0; i<rb.slices.length; i++) {
+ rb.slices[i] = slices.get(i).getName();
+ }
+ ***/
+ }
+
+ //
+ // Map slices to shards
+ //
+ if (zkController != null) {
+ for (int i=0; i<rb.shards.length; i++) {
+ if (rb.shards[i] == null) {
+ if (cloudState == null) {
+ cloudState = zkController.getCloudState();
+ slices = cloudState.getSlices(cloudDescriptor.getCollectionName());
+ }
+ String sliceName = rb.slices[i];
+
+ Slice slice = slices.get(sliceName);
+
+ if (slice==null) {
+ // Treat this the same as "all servers down" for a slice, and let things continue
+ // if partial results are acceptable
+ rb.shards[i] = "";
+ continue;
+ // throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
+ }
+
+ Map<String, ZkNodeProps> sliceShards = slice.getShards();
+
+ // For now, recreate the | delimited list of equivalent servers
+ Set<String> liveNodes = cloudState.getLiveNodes();
+ StringBuilder sliceShardsStr = new StringBuilder();
+ boolean first = true;
+ for (ZkNodeProps nodeProps : sliceShards.values()) {
+ if (!liveNodes.contains(nodeProps.get(ZkStateReader.NODE_NAME)))
+ continue;
+ if (first) {
+ first = false;
+ } else {
+ sliceShardsStr.append('|');
+ }
+ String url = nodeProps.get("url");
+ if (url.startsWith("http://"))
+ url = url.substring(7);
+ sliceShardsStr.append(url);
+ }
+
+ rb.shards[i] = sliceShardsStr.toString();
+ }
+ }
+ }
}
String shards_rows = params.get(ShardParams.SHARDS_ROWS);
if(shards_rows != null) {
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java Wed Oct 13 17:01:13 2010
@@ -32,7 +32,6 @@ import java.util.WeakHashMap;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.QueryElevationParams;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +47,7 @@ import org.apache.lucene.index.IndexRead
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.util.StringHelper;
+import org.apache.solr.cloud.ZkController;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
@@ -172,19 +172,30 @@ public class QueryElevationComponent ext
"QueryElevationComponent must specify argument: '"+CONFIG_FILE
+"' -- path to elevate.xml" );
}
- File fC = new File( core.getResourceLoader().getConfigDir(), f );
- File fD = new File( core.getDataDir(), f );
- if( fC.exists() == fD.exists() ) {
- throw new SolrException( SolrException.ErrorCode.SERVER_ERROR,
- "QueryElevationComponent missing config file: '"+f + "\n"
- +"either: "+fC.getAbsolutePath() + " or " + fD.getAbsolutePath() + " must exist, but not both." );
- }
- if( fC.exists() ) {
- log.info( "Loading QueryElevation from: "+fC.getAbsolutePath() );
- Config cfg = new Config( core.getResourceLoader(), f );
- elevationCache.put(null, loadElevationMap( cfg ));
+ boolean exists = false;
+
+ // check if using ZooKeeper
+ ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+ if(zkController != null) {
+ // TODO : shouldn't have to keep reading the config name when it has been read before
+ exists = zkController.configFileExists(zkController.readConfigName(core.getCoreDescriptor().getCloudDescriptor().getCollectionName()), f);
+ } else {
+ File fC = new File( core.getResourceLoader().getConfigDir(), f );
+ File fD = new File( core.getDataDir(), f );
+ if( fC.exists() == fD.exists() ) {
+ throw new SolrException( SolrException.ErrorCode.SERVER_ERROR,
+ "QueryElevationComponent missing config file: '"+f + "\n"
+ +"either: "+fC.getAbsolutePath() + " or " + fD.getAbsolutePath() + " must exist, but not both." );
+ }
+ if( fC.exists() ) {
+ exists = true;
+ log.info( "Loading QueryElevation from: "+ fC.getAbsolutePath() );
+ Config cfg = new Config( core.getResourceLoader(), f );
+ elevationCache.put(null, loadElevationMap( cfg ));
+ }
}
- else {
+
+ if (!exists){
// preload the first data
RefCounted<SolrIndexSearcher> searchHolder = null;
try {
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java Wed Oct 13 17:01:13 2010
@@ -26,15 +26,11 @@ import org.apache.solr.request.SolrQuery
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.DocListAndSet;
import org.apache.solr.search.QParser;
-import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.search.SortSpec;
-import org.apache.solr.util.SolrPluginUtils;
+import org.apache.solr.search.SolrIndexSearcher;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* This class is experimental and will be changing in the future.
@@ -42,9 +38,8 @@ import java.util.Set;
* @version $Id$
* @since solr 1.3
*/
-public class ResponseBuilder {
-
-
+public class ResponseBuilder
+{
public SolrQueryRequest req;
public SolrQueryResponse rsp;
public boolean doHighlights;
@@ -101,7 +96,9 @@ public class ResponseBuilder {
public int stage; // What stage is this current request at?
//The address of the Shard
+ boolean isDistrib; // is this a distributed search?
public String[] shards;
+ public String[] slices; // the optional logical ids of the shards
public int shards_rows = -1;
public int shards_start = -1;
public List<ShardRequest> outgoing; // requests to be sent
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java Wed Oct 13 17:01:13 2010
@@ -17,32 +17,53 @@
package org.apache.solr.handler.component;
-import org.apache.solr.handler.RequestHandlerBase;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.RTimer;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.lucene.queryParser.ParseException;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.impl.LBHttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.RTimer;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.client.solrj.SolrServer;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
-
import org.apache.solr.util.SolrPluginUtils;
import org.apache.solr.util.plugin.SolrCoreAware;
-import org.apache.solr.core.SolrCore;
-import org.apache.lucene.queryParser.ParseException;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.HttpClient;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.*;
/**
*
@@ -199,7 +220,7 @@ public class SearchHandler extends Reque
subt.stop();
}
- if (rb.shards == null) {
+ if (!rb.isDistrib) {
// a normal non-distributed request
// The semantics of debugging vs not debugging are different enough that
@@ -265,6 +286,7 @@ public class SearchHandler extends Reque
for (String shard : sreq.actualShards) {
ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
params.remove(ShardParams.SHARDS); // not a top-level request
+ params.remove("distrib"); // not a top-level request
params.remove("indent");
params.remove(CommonParams.HEADER_ECHO_PARAMS);
params.set(ShardParams.IS_SHARD, true); // a sub (shard) request
@@ -367,6 +389,8 @@ class HttpCommComponent {
static HttpClient client;
+ static Random r = new Random();
+ static LBHttpSolrServer loadbalancer;
static {
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
@@ -375,12 +399,29 @@ class HttpCommComponent {
mgr.getParams().setConnectionTimeout(SearchHandler.connectionTimeout);
mgr.getParams().setSoTimeout(SearchHandler.soTimeout);
// mgr.getParams().setStaleCheckingEnabled(false);
- client = new HttpClient(mgr);
+
+ client = new HttpClient(mgr);
+
+ // prevent retries (note: this didn't work when set on mgr.. needed to be set on client)
+ DefaultHttpMethodRetryHandler retryhandler = new DefaultHttpMethodRetryHandler(0, false);
+ client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, retryhandler);
+
+ try {
+ loadbalancer = new LBHttpSolrServer(client);
+ } catch (MalformedURLException e) {
+ // should be impossible since we're not passing any URLs here
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,e);
+ }
}
CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor);
Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>();
+ // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+ // This is primarily to keep track of what order we should use to query the replicas of a shard
+ // so that we use the same replica for all phases of a distributed request.
+ Map<String,List<String>> shardToURLs = new HashMap<String,List<String>>();
+
HttpCommComponent() {
}
@@ -404,7 +445,36 @@ class HttpCommComponent {
}
}
+
+ // Not thread safe... don't use in Callable.
+ // Don't modify the returned URL list.
+ private List<String> getURLs(String shard) {
+ List<String> urls = shardToURLs.get(shard);
+ if (urls==null) {
+ urls = StrUtils.splitSmart(shard,"|",true);
+
+ // convert shard to URL
+ for (int i=0; i<urls.size(); i++) {
+ urls.set(i, SearchHandler.scheme + urls.get(i));
+ }
+
+ //
+ // Shuffle the list instead of use round-robin by default.
+ // This prevents accidental synchronization where multiple shards could get in sync
+ // and query the same replica at the same time.
+ //
+ if (urls.size() > 1)
+ Collections.shuffle(urls, r);
+ shardToURLs.put(shard, urls);
+ }
+ return urls;
+ }
+
+
void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
+ // do this outside of the callable for thread safety reasons
+ final List<String> urls = getURLs(shard);
+
Callable<ShardResponse> task = new Callable<ShardResponse>() {
public ShardResponse call() throws Exception {
@@ -416,13 +486,9 @@ class HttpCommComponent {
long startTime = System.currentTimeMillis();
try {
- // String url = "http://" + shard + "/select";
- String url = SearchHandler.scheme + shard;
-
params.remove(CommonParams.WT); // use default (currently javabin)
params.remove(CommonParams.VERSION);
- SolrServer server = new CommonsHttpSolrServer(url, client);
// SolrRequest req = new QueryRequest(SolrRequest.METHOD.POST, "/select");
// use generic request to avoid extra processing of queries
QueryRequest req = new QueryRequest(params);
@@ -430,10 +496,24 @@ class HttpCommComponent {
// no need to set the response parser as binary is the default
// req.setResponseParser(new BinaryResponseParser());
- // srsp.rsp = server.request(req);
- // srsp.rsp = server.query(sreq.params);
- ssr.nl = server.request(req);
+ // if there are no shards available for a slice, urls.size()==0
+ if (urls.size()==0) {
+ // TODO: what's the right error code here? We should use the same thing when
+ // all of the servers for a shard are down.
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+ }
+
+ if (urls.size() <= 1) {
+ String url = urls.get(0);
+ srsp.setShardAddress(url);
+ SolrServer server = new CommonsHttpSolrServer(url, client);
+ ssr.nl = server.request(req);
+ } else {
+ LBHttpSolrServer.Rsp rsp = loadbalancer.request(new LBHttpSolrServer.Req(req, urls));
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ }
} catch (Throwable th) {
srsp.setException(th);
if (th instanceof SolrException) {
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java Wed Oct 13 17:01:13 2010
@@ -41,8 +41,6 @@ public class ShardRequest {
public int purpose; // the purpose of this request
public String[] shards; // the shards this request should be sent to, null for all
-// TODO: how to request a specific shard address?
-
public ModifiableSolrParams params;
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java Wed Oct 13 17:01:13 2010
@@ -79,4 +79,9 @@ public final class ShardResponse {
{
this.rspCode = rspCode;
}
+
+ /** What was the shard address that returned this response. Example: "http://localhost:8983/solr" */
+ public String getShardAddress() { return this.shardAddress; }
+
+ void setShardAddress(String addr) { this.shardAddress = addr; }
}
Modified: lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java (original)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java Wed Oct 13 17:01:13 2010
@@ -61,6 +61,7 @@ public class TermsComponent extends Sear
// TODO: temporary... this should go in a different component.
String shards = params.get(ShardParams.SHARDS);
if (shards != null) {
+ rb.isDistrib = true;
if (params.get(ShardParams.SHARDS_QT) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shards.qt parameter specified");
}
@@ -298,9 +299,6 @@ public class TermsComponent extends Sear
// base shard request on original parameters
sreq.params = new ModifiableSolrParams(params);
- // don't pass through the shards param
- sreq.params.remove(ShardParams.SHARDS);
-
// remove any limits for shards, we want them to return all possible
// responses
// we want this so we can calculate the correct counts
@@ -310,11 +308,6 @@ public class TermsComponent extends Sear
sreq.params.set(TermsParams.TERMS_LIMIT, -1);
sreq.params.set(TermsParams.TERMS_SORT, TermsParams.TERMS_SORT_INDEX);
- // TODO: is there a better way to handle this?
- String qt = params.get(CommonParams.QT);
- if (qt != null) {
- sreq.params.add(CommonParams.QT, qt);
- }
return sreq;
}
Added: lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java (added)
+++ lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,148 @@
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+
+public class CloudSolrServer extends SolrServer {
+ private volatile ZkStateReader zkStateReader;
+ private String zkHost; // the zk server address
+ private int zkConnectTimeout = 10000;
+ private int zkClientTimeout = 10000;
+ private String defaultCollection;
+ private LBHttpSolrServer lbServer;
+ Random rand = new Random();
+
+ /**
+ * @param zkHost The address of the zookeeper quorum containing the cloud state
+ */
+ public CloudSolrServer(String zkHost) throws MalformedURLException {
+ this(zkHost, new LBHttpSolrServer());
+ }
+
+ /**
+ * @param zkHost The address of the zookeeper quorum containing the cloud state
+ */
+ public CloudSolrServer(String zkHost, LBHttpSolrServer lbServer) {
+ this.zkHost = zkHost;
+ this.lbServer = lbServer;
+ }
+
+ /** Sets the default collection for request */
+ public void setDefaultCollection(String collection) {
+ this.defaultCollection = collection;
+ }
+
+ /** Set the connect timeout to the zookeeper ensemble in ms */
+ public void setZkConnectTimeout(int zkConnectTimeout) {
+ this.zkConnectTimeout = zkConnectTimeout;
+ }
+
+ /** Set the timeout to the zookeeper ensemble in ms */
+ public void setZkClientTimeout(int zkClientTimeout) {
+ this.zkClientTimeout = zkClientTimeout;
+ }
+
+ /**
+ * Connect to the zookeeper ensemble.
+ * This is an optional method that may be used to force a connect before any other requests are sent.
+ *
+ * @throws IOException
+ * @throws TimeoutException
+ * @throws InterruptedException
+ */
+ public void connect() {
+ if (zkStateReader != null) return;
+ synchronized(this) {
+ if (zkStateReader != null) return;
+ try {
+ ZkStateReader zk = new ZkStateReader(zkHost, zkConnectTimeout, zkClientTimeout);
+ zk.makeCollectionsNodeWatches();
+ zk.makeShardZkNodeWatches(false);
+ zk.updateCloudState(true);
+ zkStateReader = zk;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (KeeperException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+
+ } catch (IOException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+
+ } catch (TimeoutException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
+ }
+
+
+ @Override
+ public NamedList<Object> request(SolrRequest request) throws SolrServerException, IOException {
+ connect();
+
+ CloudState cloudState = zkStateReader.getCloudState();
+
+ String collection = request.getParams().get("collection", defaultCollection);
+
+ // TODO: allow multiple collections to be specified via comma separated list
+
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ Set<String> liveNodes = cloudState.getLiveNodes();
+
+ // IDEA: have versions on various things... like a global cloudState version
+ // or shardAddressVersion (which only changes when the shards change)
+ // to allow caching.
+
+ // build a map of unique nodes
+ // TODO: allow filtering by group, role, etc
+ Map<String,ZkNodeProps> nodes = new HashMap<String,ZkNodeProps>();
+ List<String> urlList = new ArrayList<String>();
+ for (Slice slice : slices.values()) {
+ for (ZkNodeProps nodeProps : slice.getShards().values()) {
+ String node = nodeProps.get(ZkStateReader.NODE_NAME);
+ if (!liveNodes.contains(node)) continue;
+ if (nodes.put(node, nodeProps) == null) {
+ String url = nodeProps.get(ZkStateReader.URL_PROP);
+ urlList.add(url);
+ }
+ }
+ }
+
+ Collections.shuffle(urlList, rand);
+ // System.out.println("########################## MAKING REQUEST TO " + urlList);
+ // TODO: set distrib=true if we detected more than one shard?
+ LBHttpSolrServer.Req req = new LBHttpSolrServer.Req(request, urlList);
+ LBHttpSolrServer.Rsp rsp = lbServer.request(req);
+ return rsp.getResponse();
+ }
+
+ public void close() {
+ if (zkStateReader != null) {
+ synchronized(this) {
+ if (zkStateReader!= null)
+ zkStateReader.close();
+ zkStateReader = null;
+ }
+ }
+ }
+}