You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2010/10/13 19:01:33 UTC
svn commit: r1022188 [1/4] - in /lucene/dev/trunk/solr: ./ lib/
src/common/org/apache/solr/common/cloud/
src/common/org/apache/solr/common/params/ src/java/org/apache/solr/cloud/
src/java/org/apache/solr/core/ src/java/org/apache/solr/handler/admin/ sr...
Author: markrmiller
Date: Wed Oct 13 17:01:13 2010
New Revision: 1022188
URL: http://svn.apache.org/viewvc?rev=1022188&view=rev
Log:
SOLR-1873: SolrCloud - added shared/central config and core/shard managment via zookeeper, built-in load balancing, and infrastructure for future SolrCloud work.
Added:
lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar (with props)
lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar (with props)
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java
lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java
lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/
lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java
lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java
lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/CloudSolrServer.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/AbstractDistributedZkTestCase.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/AbstractZkTestCase.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/BasicZkTest.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkNodePropsTest.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkSolrClientTest.java
lucene/dev/trunk/solr/src/test/org/apache/solr/cloud/ZkTestServer.java
lucene/dev/trunk/solr/src/test/test-files/solr/solr.xml
lucene/dev/trunk/solr/src/webapp/web/admin/zookeeper.jsp
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java
lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/trunk/solr/src/java/org/apache/solr/core/CoreDescriptor.java
lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrCore.java
lucene/dev/trunk/solr/src/java/org/apache/solr/core/SolrResourceLoader.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryComponent.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/QueryElevationComponent.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ResponseBuilder.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/SearchHandler.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardRequest.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/ShardResponse.java
lucene/dev/trunk/solr/src/java/org/apache/solr/handler/component/TermsComponent.java
lucene/dev/trunk/solr/src/solrj/org/apache/solr/client/solrj/impl/LBHttpSolrServer.java
lucene/dev/trunk/solr/src/test/org/apache/solr/BaseDistributedSearchTestCase.java
lucene/dev/trunk/solr/src/test/org/apache/solr/handler/component/DistributedTermsComponentTest.java
lucene/dev/trunk/solr/src/webapp/src/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
lucene/dev/trunk/solr/src/webapp/web/admin/index.jsp
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Oct 13 17:01:13 2010
@@ -291,6 +291,10 @@ New Features
* SOLR-2010: Added ability to verify that spell checking collations have
actual results in the index. (James Dyer via gsingers)
+
+* SOLR-1873: SolrCloud - added shared/central config and core/shard managment via zookeeper,
+ built-in load balancing, and infrastructure for future SolrCloud work.
+ (yonik, Mark Miller)
Optimizations
----------------------
Added: lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar?rev=1022188&view=auto
==============================================================================
Binary file - no diff available.
Propchange: lucene/dev/trunk/solr/lib/log4j-over-slf4j-1.5.5.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar?rev=1022188&view=auto
==============================================================================
Binary file - no diff available.
Propchange: lucene/dev/trunk/solr/lib/zookeeper-3.3.1.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/CloudState.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,144 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// immutable
+public class CloudState {
+ protected static Logger log = LoggerFactory.getLogger(CloudState.class);
+
+ private final Map<String,Map<String,Slice>> collectionStates;
+ private final Set<String> liveNodes;
+
+ public CloudState(Set<String> liveNodes, Map<String,Map<String,Slice>> collectionStates) {
+ this.liveNodes = liveNodes;
+ this.collectionStates = collectionStates;
+ }
+
+ public Map<String,Slice> getSlices(String collection) {
+ Map<String,Slice> collectionState = collectionStates.get(collection);
+ if(collectionState == null) {
+ return null;
+ }
+ return Collections.unmodifiableMap(collectionState);
+ }
+
+ public Set<String> getCollections() {
+ return Collections.unmodifiableSet(collectionStates.keySet());
+ }
+
+ public Map<String,Map<String,Slice>> getCollectionStates() {
+ return Collections.unmodifiableMap(collectionStates);
+ }
+
+ public Set<String> getLiveNodes() {
+ return Collections.unmodifiableSet(liveNodes);
+ }
+
+ public boolean liveNodesContain(String name) {
+ return liveNodes.contains(name);
+ }
+
+ public static CloudState buildCloudState(SolrZkClient zkClient, CloudState oldCloudState, boolean onlyLiveNodes) throws KeeperException, InterruptedException, IOException {
+ Map<String,Map<String,Slice>> collectionStates;
+ if (!onlyLiveNodes) {
+ List<String> collections = zkClient.getChildren(
+ ZkStateReader.COLLECTIONS_ZKNODE, null);
+
+ collectionStates = new HashMap<String,Map<String,Slice>>();
+ for (String collection : collections) {
+ String shardIdPaths = ZkStateReader.COLLECTIONS_ZKNODE + "/"
+ + collection + ZkStateReader.SHARDS_ZKNODE;
+ List<String> shardIdNames;
+ try {
+ shardIdNames = zkClient.getChildren(shardIdPaths, null);
+ } catch (KeeperException.NoNodeException e) {
+ // node is not valid currently
+ continue;
+ }
+ Map<String,Slice> slices = new HashMap<String,Slice>();
+ for (String shardIdZkPath : shardIdNames) {
+ Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
+ + "/" + shardIdZkPath);
+ Slice slice = new Slice(shardIdZkPath, shardsMap);
+ slices.put(shardIdZkPath, slice);
+ }
+ collectionStates.put(collection, slices);
+ }
+ } else {
+ collectionStates = oldCloudState.getCollectionStates();
+ }
+
+ CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);
+
+ return cloudInfo;
+ }
+
+ /**
+ * @param zkClient
+ * @param shardsZkPath
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ private static Map<String,ZkNodeProps> readShards(SolrZkClient zkClient, String shardsZkPath)
+ throws KeeperException, InterruptedException, IOException {
+
+ Map<String,ZkNodeProps> shardNameToProps = new HashMap<String,ZkNodeProps>();
+
+ if (zkClient.exists(shardsZkPath, null) == null) {
+ throw new IllegalStateException("Cannot find zk shards node that should exist:"
+ + shardsZkPath);
+ }
+
+ List<String> shardZkPaths = zkClient.getChildren(shardsZkPath, null);
+
+ for(String shardPath : shardZkPaths) {
+ byte[] data = zkClient.getData(shardsZkPath + "/" + shardPath, null,
+ null);
+
+ ZkNodeProps props = new ZkNodeProps();
+ props.load(data);
+ shardNameToProps.put(shardPath, props);
+ }
+
+ return Collections.unmodifiableMap(shardNameToProps);
+ }
+
+ private static Set<String> getLiveNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ List<String> liveNodes = zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null);
+ Set<String> liveNodesSet = new HashSet<String>(liveNodes.size());
+ liveNodesSet.addAll(liveNodes);
+
+ return liveNodesSet;
+ }
+
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ConnectionManager.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,140 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ConnectionManager implements Watcher {
+ protected static final Logger log = LoggerFactory
+ .getLogger(ConnectionManager.class);
+
+ private final String name;
+ private CountDownLatch clientConnected;
+ private KeeperState state;
+ private boolean connected;
+
+ private ZkClientConnectionStrategy connectionStrategy;
+
+ private String zkServerAddress;
+
+ private int zkClientTimeout;
+
+ private SolrZkClient client;
+
+ private OnReconnect onReconnect;
+
+ public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
+ this.name = name;
+ this.client = client;
+ this.connectionStrategy = strat;
+ this.zkServerAddress = zkServerAddress;
+ this.zkClientTimeout = zkClientTimeout;
+ this.onReconnect = onConnect;
+ reset();
+ }
+
+ private synchronized void reset() {
+ clientConnected = new CountDownLatch(1);
+ state = KeeperState.Disconnected;
+ connected = false;
+ }
+
+ public synchronized void process(WatchedEvent event) {
+ if (log.isInfoEnabled()) {
+ log.info("Watcher " + this + " name:" + name + " got event " + event
+ + " path:" + event.getPath() + " type:" + event.getType());
+ }
+
+ state = event.getState();
+ if (state == KeeperState.SyncConnected) {
+ connected = true;
+ clientConnected.countDown();
+ } else if (state == KeeperState.Expired) {
+
+ connected = false;
+ log.info("Attempting to reconnect to ZooKeeper...");
+
+ try {
+ connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() {
+ @Override
+ public void update(SolrZooKeeper keeper) throws InterruptedException, TimeoutException, IOException {
+ waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
+ client.updateKeeper(keeper);
+ if(onReconnect != null) {
+ onReconnect.command();
+ }
+ ConnectionManager.this.connected = true;
+ }
+ });
+ } catch (Exception e) {
+ log.error("", e);
+ }
+
+ log.info("Connected:" + connected);
+ } else if (state == KeeperState.Disconnected) {
+ // ZooKeeper client will recover when it can
+ // TODO: this needs to be investigated more
+ connected = false;
+ } else {
+ connected = false;
+ }
+ notifyAll();
+ }
+
+ public synchronized boolean isConnected() {
+ return connected;
+ }
+
+ public synchronized KeeperState state() {
+ return state;
+ }
+
+ public synchronized void waitForConnected(long waitForConnection)
+ throws InterruptedException, TimeoutException, IOException {
+ long expire = System.currentTimeMillis() + waitForConnection;
+ long left = waitForConnection;
+ while (!connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (!connected) {
+ throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
+ }
+ }
+
+ public synchronized void waitForDisconnected(long timeout)
+ throws InterruptedException, TimeoutException {
+ long expire = System.currentTimeMillis() + timeout;
+ long left = timeout;
+ while (connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (connected) {
+ throw new TimeoutException("Did not disconnect");
+ }
+ }
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/DefaultConnectionStrategy.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,74 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: improve backoff retry impl
+ */
+public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
+
+ private static Logger log = LoggerFactory.getLogger(DefaultConnectionStrategy.class);
+ private ScheduledExecutorService executor;
+
+ @Override
+ public void connect(String serverAddress, int timeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
+ updater.update(new SolrZooKeeper(serverAddress, timeout, watcher));
+ }
+
+ @Override
+ public void reconnect(final String serverAddress, final int zkClientTimeout,
+ final Watcher watcher, final ZkUpdate updater) throws IOException {
+ log.info("Starting reconnect to ZooKeeper attempts ...");
+ executor = Executors.newScheduledThreadPool(1);
+ executor.schedule(new Runnable() {
+ private int delay = 1000;
+ public void run() {
+ log.info("Attempting the connect...");
+ boolean connected = false;
+ try {
+ updater.update(new SolrZooKeeper(serverAddress, zkClientTimeout, watcher));
+ log.info("Reconnected to ZooKeeper");
+ connected = true;
+ } catch (Exception e) {
+ log.error("", e);
+ log.info("Reconnect to ZooKeeper failed");
+ }
+ if(connected) {
+ executor.shutdownNow();
+ } else {
+ if(delay < 240000) {
+ delay = delay * 2;
+ }
+ executor.schedule(this, delay, TimeUnit.MILLISECONDS);
+ }
+
+ }
+ }, 1000, TimeUnit.MILLISECONDS);
+ }
+
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/OnReconnect.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,22 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+public interface OnReconnect {
+ public void command();
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/Slice.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,41 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+
+// immutable
+public class Slice {
+ private final Map<String,ZkNodeProps> shards;
+ private final String name;
+
+ public Slice(String name, Map<String,ZkNodeProps> shards) {
+ this.shards = shards;
+ this.name = name;
+ }
+
+ public Map<String,ZkNodeProps> getShards() {
+ return Collections.unmodifiableMap(shards);
+ }
+
+ public String getName() {
+ return name;
+ }
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZkClient.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,493 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * All Solr ZooKeeper interactions should go through this class rather than
+ * ZooKeeper. This class handles synchronous connects and reconnections.
+ *
+ */
+public class SolrZkClient {
+ static final String NEWL = System.getProperty("line.separator");
+
+ static final int DEFAULT_CLIENT_CONNECT_TIMEOUT = 30000;
+
+ private static final Logger log = LoggerFactory
+ .getLogger(SolrZkClient.class);
+
+ private ConnectionManager connManager;
+
+ private volatile SolrZooKeeper keeper;
+
+ /**
+ * @param zkServerAddress
+ * @param zkClientTimeout
+ * @throws InterruptedException
+ * @throws TimeoutException
+ * @throws IOException
+ */
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException {
+ this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
+ }
+
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) throws InterruptedException, TimeoutException, IOException {
+ this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect, zkClientConnectTimeout);
+ }
+
+ /**
+ * @param zkServerAddress
+ * @param zkClientTimeout
+ * @param strat
+ * @param onReconnect
+ * @param clientConnectTimeout
+ * @throws InterruptedException
+ * @throws TimeoutException
+ * @throws IOException
+ */
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout,
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect) throws InterruptedException,
+ TimeoutException, IOException {
+ this(zkServerAddress, zkClientTimeout, strat, onReconnect, DEFAULT_CLIENT_CONNECT_TIMEOUT);
+ }
+
+ /**
+ * @param zkServerAddress
+ * @param zkClientTimeout
+ * @param strat
+ * @param onReconnect
+ * @param clientConnectTimeout
+ * @throws InterruptedException
+ * @throws TimeoutException
+ * @throws IOException
+ */
+ public SolrZkClient(String zkServerAddress, int zkClientTimeout,
+ ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException,
+ TimeoutException, IOException {
+ connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
+ strat.connect(zkServerAddress, zkClientTimeout, connManager,
+ new ZkUpdate() {
+ @Override
+ public void update(SolrZooKeeper zooKeeper) {
+ SolrZooKeeper oldKeeper = keeper;
+ keeper = zooKeeper;
+ if (oldKeeper != null) {
+ try {
+ oldKeeper.close();
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
+ }
+ });
+ connManager.waitForConnected(clientConnectTimeout);
+ }
+
+ /**
+ * @return true if client is connected
+ */
+ public boolean isConnected() {
+ return keeper != null && keeper.getState() == ZooKeeper.States.CONNECTED;
+ }
+
+ /**
+ * @param path
+ * @param version
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public void delete(final String path, int version)
+ throws InterruptedException, KeeperException {
+ keeper.delete(path, version);
+ }
+
+ /**
+ * Return the stat of the node of the given path. Return null if no such a
+ * node exists.
+ * <p>
+ * If the watch is non-null and the call is successful (no exception is thrown),
+ * a watch will be left on the node with the given path. The watch will be
+ * triggered by a successful operation that creates/delete the node or sets
+ * the data on the node.
+ *
+ * @param path the node path
+ * @param watcher explicit watcher
+ * @return the stat of the node of the given path; return null if no such a
+ * node exists.
+ * @throws KeeperException If the server signals an error
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws IllegalArgumentException if an invalid path is specified
+ */
+ public Stat exists(final String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ return keeper.exists(path, watcher);
+ }
+
+ /**
+ * @param path
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public boolean exists(final String path)
+ throws KeeperException, InterruptedException {
+ return keeper.exists(path, null) != null;
+ }
+
+ /**
+ * @param path
+ * @param data
+ * @param acl
+ * @param createMode
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public String create(final String path, byte data[], List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ return keeper.create(path, data, acl, createMode);
+ }
+
+ /**
+ * @param path
+ * @param watcher
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public List<String> getChildren(final String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ return keeper.getChildren(path, watcher);
+ }
+
+ /**
+ * @param path
+ * @param watcher
+ * @param stat
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] getData(final String path, Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException {
+ return keeper.getData(path, watcher, stat);
+ }
+
+ /**
+ * @param path
+ * @param data
+ * @param version
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public Stat setData(final String path, byte data[], int version)
+ throws KeeperException, InterruptedException {
+ return keeper.setData(path, data, version);
+ }
+
+ /**
+ *
+ * @param path
+ * @param data
+ * @param watcher
+ * @return
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public String create(String path, byte[] data, CreateMode createMode) throws KeeperException, InterruptedException {
+
+ String zkPath = keeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
+
+ return zkPath;
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ *
+ * @param path
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void makePath(String path) throws KeeperException,
+ InterruptedException {
+ makePath(path, null, CreateMode.PERSISTENT);
+ }
+
+ public void makePath(String path, CreateMode createMode) throws KeeperException,
+ InterruptedException {
+ makePath(path, null, createMode);
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * @param path
+ * @param data to set on the last zkNode
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void makePath(String path, byte[] data) throws KeeperException,
+ InterruptedException {
+ makePath(path, data, CreateMode.PERSISTENT);
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ *
+ * @param path
+ * @param data to set on the last zkNode
+ * @param createMode
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void makePath(String path, byte[] data, CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ makePath(path, data, createMode, null);
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ *
+ * @param path
+ * @param data to set on the last zkNode
+ * @param createMode
+ * @param watcher
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void makePath(String path, byte[] data, CreateMode createMode,
+ Watcher watcher) throws KeeperException, InterruptedException {
+ makePath(path, data, createMode, watcher, false);
+ }
+
+ /**
+ * Creates the path in ZooKeeper, creating each node as necessary.
+ *
+ * e.g. If <code>path=/solr/group/node</code> and none of the nodes, solr,
+ * group, node exist, each will be created.
+ *
+ * @param path
+ * @param data to set on the last zkNode
+ * @param createMode
+ * @param watcher
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void makePath(String path, byte[] data, CreateMode createMode,
+ Watcher watcher, boolean failOnExists) throws KeeperException, InterruptedException {
+ if (log.isInfoEnabled()) {
+ log.info("makePath: " + path);
+ }
+
+ if (path.startsWith("/")) {
+ path = path.substring(1, path.length());
+ }
+ String[] paths = path.split("/");
+ StringBuilder sbPath = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ byte[] bytes = null;
+ String pathPiece = paths[i];
+ sbPath.append("/" + pathPiece);
+ String currentPath = sbPath.toString();
+ Object exists = exists(currentPath, watcher);
+ if (exists == null || ((i == paths.length -1) && failOnExists)) {
+ CreateMode mode = CreateMode.PERSISTENT;
+ if (i == paths.length - 1) {
+ mode = createMode;
+ bytes = data;
+ }
+ keeper.create(currentPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+ if(i == paths.length -1) {
+ // set new watch
+ exists(currentPath, watcher);
+ }
+ } else if (i == paths.length - 1) {
+ // TODO: version ? for now, don't worry about race
+ setData(currentPath, data, -1);
+ // set new watch
+ exists(currentPath, watcher);
+ }
+ }
+ }
+
+ /**
+ * @param zkPath
+ * @param createMode
+ * @param watcher
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void makePath(String zkPath, CreateMode createMode, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ makePath(zkPath, null, createMode, watcher);
+ }
+
+ /**
+ * Write data to ZooKeeper.
+ *
+ * @param path
+ * @param data
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void setData(String path, byte[] data) throws KeeperException,
+ InterruptedException {
+
+ makePath(path);
+
+ Object exists = exists(path, null);
+ if (exists != null) {
+ setData(path, data, -1);
+ } else {
+ create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
+
+ /**
+ * Write file to ZooKeeper - default system encoding used.
+ *
+ * @param path path to upload file to e.g. /solr/conf/solrconfig.xml
+ * @param file path to file to be uploaded
+ * @throws IOException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void setData(String path, File file) throws IOException,
+ KeeperException, InterruptedException {
+ if (log.isInfoEnabled()) {
+ log.info("Write to ZooKeepeer " + file.getAbsolutePath() + " to " + path);
+ }
+
+ String data = FileUtils.readFileToString(file);
+ setData(path, data.getBytes("UTF-8"));
+ }
+
+ /**
+ * Fills string with printout of current ZooKeeper layout.
+ *
+ * @param path
+ * @param indent
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void printLayout(String path, int indent, StringBuilder string)
+ throws KeeperException, InterruptedException {
+ byte[] data = getData(path, null, null);
+ List<String> children = getChildren(path, null);
+ StringBuilder dent = new StringBuilder();
+ for (int i = 0; i < indent; i++) {
+ dent.append(" ");
+ }
+ string.append(dent + path + " (" + children.size() + ")" + NEWL);
+ if (data != null) {
+ try {
+ String dataString = new String(data, "UTF-8");
+ if (!path.endsWith(".txt") && !path.endsWith(".xml")) {
+ string.append(dent + "DATA:\n" + dent + " "
+ + dataString.replaceAll("\n", "\n" + dent + " ") + NEWL);
+ } else {
+ string.append(dent + "DATA: ...supressed..." + NEWL);
+ }
+ } catch (UnsupportedEncodingException e) {
+ // can't happen - UTF-8
+ throw new RuntimeException(e);
+ }
+ }
+
+ for (String child : children) {
+ if (!child.equals("quota")) {
+ printLayout(path + (path.equals("/") ? "" : "/") + child, indent + 1,
+ string);
+ }
+ }
+
+ }
+
+ /**
+ * Prints current ZooKeeper layout to stdout.
+ *
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void printLayoutToStdOut() throws KeeperException,
+ InterruptedException {
+ StringBuilder sb = new StringBuilder();
+ printLayout("/", 0, sb);
+ System.out.println(sb.toString());
+ }
+
+ /**
+ * @throws InterruptedException
+ */
+ public void close() throws InterruptedException {
+ keeper.close();
+ }
+
+ /**
+ * Allows package private classes to update volatile ZooKeeper.
+ *
+ * @param keeper
+ * @throws InterruptedException
+ */
+ void updateKeeper(SolrZooKeeper keeper) throws InterruptedException {
+ SolrZooKeeper oldKeeper = this.keeper;
+ this.keeper = keeper;
+ if (oldKeeper != null) {
+ oldKeeper.close();
+ }
+ }
+
+ public SolrZooKeeper getSolrZooKeeper() {
+ return keeper;
+ }
+
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/SolrZooKeeper.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,37 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.zookeeper.ClientCnxn;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class SolrZooKeeper extends ZooKeeper {
+
+ public SolrZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
+ throws IOException {
+ super(connectString, sessionTimeout, watcher);
+ }
+
+ public ClientCnxn getConnection() {
+ return cnxn;
+ }
+
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkClientConnectionStrategy.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,36 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ *
+ */
+public abstract class ZkClientConnectionStrategy {
+ public abstract void connect(String zkServerAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+ public abstract void reconnect(String serverAddress, int zkClientTimeout, Watcher watcher, ZkUpdate updater) throws IOException, InterruptedException, TimeoutException;
+
+ public static abstract class ZkUpdate {
+ public abstract void update(SolrZooKeeper zooKeeper) throws InterruptedException, TimeoutException, IOException;
+ }
+
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkNodeProps.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,58 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Map.Entry;
+
+public class ZkNodeProps extends HashMap<String,String> {
+
+ private static final long serialVersionUID = 1L;
+
+ public void load(byte[] bytes) throws IOException {
+ String stringRep = new String(bytes, "UTF-8");
+ String[] lines = stringRep.split("\n");
+ for (String line : lines) {
+ int sepIndex = line.indexOf('=');
+ String key = line.substring(0, sepIndex);
+ String value = line.substring(sepIndex + 1, line.length());
+ put(key, value);
+ }
+ }
+
+ public byte[] store() throws IOException {
+ StringBuilder sb = new StringBuilder();
+ Set<Entry<String,String>> entries = entrySet();
+ for(Entry<String,String> entry : entries) {
+ sb.append(entry.getKey() + "=" + entry.getValue() + "\n");
+ }
+ return sb.toString().getBytes("UTF-8");
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ Set<Entry<String,String>> entries = entrySet();
+ for(Entry<String,String> entry : entries) {
+ sb.append(entry.getKey() + "=" + entry.getValue() + "\n");
+ }
+ return sb.toString();
+ }
+
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZkStateReader.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,400 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkStateReader {
+ private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
+
+ public static final String COLLECTIONS_ZKNODE = "/collections";
+ public static final String URL_PROP = "url";
+ public static final String NODE_NAME = "node_name";
+ public static final String SHARDS_ZKNODE = "/shards";
+ public static final String LIVE_NODES_ZKNODE = "/live_nodes";
+
+ private volatile CloudState cloudState = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
+
+ private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
+
+ private static class ZKTF implements ThreadFactory {
+ private static ThreadGroup tg = new ThreadGroup("ZkStateReader");
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread td = new Thread(tg, r);
+ td.setDaemon(true);
+ return td;
+ }
+ }
+ private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1, new ZKTF());
+
+ private boolean cloudStateUpdateScheduled;
+
+ private SolrZkClient zkClient;
+
+ private boolean closeClient = false;
+
+ public ZkStateReader(SolrZkClient zkClient) {
+ this.zkClient = zkClient;
+ }
+
+ public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
+ closeClient = true;
+ zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
+ // on reconnect, reload cloud info
+ new OnReconnect() {
+
+ public void command() {
+ try {
+ makeCollectionsNodeWatches();
+ makeShardsWatches(true);
+ updateCloudState(false);
+ } catch (KeeperException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }
+ });
+ }
+
+ // load and publish a new CollectionInfo
+ public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+ IOException {
+ updateCloudState(immediate, false);
+ }
+
+ // load and publish a new CollectionInfo
+ public void updateLiveNodes() throws KeeperException, InterruptedException,
+ IOException {
+ updateCloudState(true, true);
+ }
+
+ // load and publish a new CollectionInfo
+ private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
+ IOException {
+
+ // TODO: - possibly: incremental update rather than reread everything
+
+ // build immutable CloudInfo
+
+ if(immediate) {
+ if(!onlyLiveNodes) {
+ log.info("Updating cloud state from ZooKeeper... ");
+ } else {
+ log.info("Updating live nodes from ZooKeeper... ");
+ }
+ CloudState cloudState;
+ cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
+ // update volatile
+ this.cloudState = cloudState;
+ } else {
+ if(cloudStateUpdateScheduled) {
+ log.info("Cloud state update for ZooKeeper already scheduled");
+ return;
+ }
+ log.info("Scheduling cloud state update from ZooKeeper...");
+ cloudStateUpdateScheduled = true;
+ updateCloudExecutor.schedule(new Runnable() {
+
+ public void run() {
+ log.info("Updating cloud state from ZooKeeper...");
+ synchronized (getUpdateLock()) {
+ cloudStateUpdateScheduled = false;
+ CloudState cloudState;
+ try {
+ cloudState = CloudState.buildCloudState(zkClient,
+ ZkStateReader.this.cloudState, onlyLiveNodes);
+ } catch (KeeperException e) {
+ if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ // update volatile
+ ZkStateReader.this.cloudState = cloudState;
+ }
+ }
+ }, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ public void makeShardZkNodeWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
+ CloudState cloudState = getCloudState();
+
+ Set<String> knownCollections = cloudState.getCollections();
+ List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+
+ for(final String collection : collections) {
+ if(makeWatchesForReconnect || !knownCollections.contains(collection)) {
+ log.info("Found new collection:" + collection);
+ Watcher watcher = new Watcher() {
+ public void process(WatchedEvent event) {
+ log.info("Detected changed ShardId in collection:" + collection);
+ try {
+ makeShardsWatches(collection, false);
+ updateCloudState(false);
+ } catch (KeeperException e) {
+ if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ }
+ }
+ };
+ boolean madeWatch = true;
+ String shardZkNode = COLLECTIONS_ZKNODE + "/" + collection
+ + SHARDS_ZKNODE;
+ for (int i = 0; i < 5; i++) {
+ try {
+ zkClient.getChildren(shardZkNode, watcher);
+ } catch (KeeperException.NoNodeException e) {
+ // most likely, the collections node has been created, but not the
+ // shards node yet -- pause and try again
+ madeWatch = false;
+ if (i == 4) {
+ log.error("Could not set shards zknode watch, because the zknode does not exist:" + shardZkNode);
+ break;
+ }
+ Thread.sleep(100);
+ }
+ if (madeWatch) {
+ log.info("Made shard watch:" + shardZkNode);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ public void makeShardsWatches(final String collection, boolean makeWatchesForReconnect) throws KeeperException,
+ InterruptedException {
+ if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + SHARDS_ZKNODE)) {
+ List<String> shardIds = zkClient.getChildren(COLLECTIONS_ZKNODE + "/"
+ + collection + SHARDS_ZKNODE, null);
+ CloudState cloudState = getCloudState();
+ Set<String> knownShardIds;
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ if (slices != null) {
+ knownShardIds = slices.keySet();
+ } else {
+ knownShardIds = new HashSet<String>(0);
+ }
+ for (final String shardId : shardIds) {
+ if (makeWatchesForReconnect || !knownShardIds.contains(shardId)) {
+ zkClient.getChildren(COLLECTIONS_ZKNODE + "/" + collection
+ + SHARDS_ZKNODE + "/" + shardId, new Watcher() {
+
+ public void process(WatchedEvent event) {
+ log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
+ try {
+ updateCloudState(false);
+ } catch (KeeperException e) {
+ if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+ }
+ });
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public void makeShardsWatches(boolean makeWatchesForReconnect) throws KeeperException, InterruptedException {
+ List<String> collections = zkClient.getChildren(COLLECTIONS_ZKNODE, null);
+ for (final String collection : collections) {
+ makeShardsWatches(collection, makeWatchesForReconnect);
+ }
+ }
+
+ /**
+ * @return information about the cluster from ZooKeeper
+ */
+ public CloudState getCloudState() {
+ return cloudState;
+ }
+
+ public Object getUpdateLock() {
+ return this;
+ }
+
+ public void close() {
+ if (closeClient) {
+ try {
+ zkClient.close();
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+ e);
+ }
+ }
+ }
+
+ public void makeCollectionsNodeWatches() throws KeeperException, InterruptedException {
+ log.info("Start watching collections zk node for changes");
+ zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+
+ public void process(WatchedEvent event) {
+ try {
+
+ log.info("Detected a new or removed collection");
+ synchronized (getUpdateLock()) {
+ makeShardZkNodeWatches(false);
+ updateCloudState(false);
+ }
+ // re-watch
+ String path = event.getPath();
+ if (path != null) {
+ zkClient.getChildren(path, this);
+ }
+ } catch (KeeperException e) {
+ if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }});
+
+ zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+
+ public void process(WatchedEvent event) {
+ if(event.getType() != EventType.NodeDataChanged) {
+ return;
+ }
+ log.info("Notified of CloudState change");
+ try {
+ synchronized (getUpdateLock()) {
+ makeShardZkNodeWatches(false);
+ updateCloudState(false);
+ }
+ zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE, this);
+ } catch (KeeperException e) {
+ if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (IOException e) {
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ }
+
+ }});
+
+ }
+}
Added: lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java (added)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/cloud/ZooKeeperException.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,33 @@
+package org.apache.solr.common.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.solr.common.SolrException;
+
+public class ZooKeeperException extends SolrException {
+
+ public ZooKeeperException(ErrorCode code, String msg, Throwable th) {
+ super(code, msg, th);
+ }
+
+ public ZooKeeperException(ErrorCode code, String msg) {
+ super(code, msg);
+ }
+
+}
Modified: lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java?rev=1022188&r1=1022187&r2=1022188&view=diff
==============================================================================
--- lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/trunk/solr/src/common/org/apache/solr/common/params/CoreAdminParams.java Wed Oct 13 17:01:13 2010
@@ -59,6 +59,12 @@ public interface CoreAdminParams
* The directories are specified by multiple indexDir parameters. */
public final static String INDEX_DIR = "indexDir";
+ /** The collection name in solr cloud */
+ public final static String COLLECTION = "collection";
+
+ /** The shard id in solr cloud */
+ public final static String SHARD = "shard";
+
public enum CoreAdminAction {
STATUS,
LOAD,
Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/CloudDescriptor.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,52 @@
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.params.SolrParams;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class CloudDescriptor {
+ private String shardId;
+ private String collectionName;
+ private SolrParams params;
+
+ public void setShardId(String shardId) {
+ this.shardId = shardId;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public void setCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ }
+
+ /** Optional parameters that can change how a core is created. */
+ public SolrParams getParams() {
+ return params;
+ }
+
+ public void setParams(SolrParams params) {
+ this.params = params;
+ }
+}
Added: lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java?rev=1022188&view=auto
==============================================================================
--- lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java (added)
+++ lucene/dev/trunk/solr/src/java/org/apache/solr/cloud/SolrZkServer.java Wed Oct 13 17:01:13 2010
@@ -0,0 +1,477 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.slf4j.LoggerFactory;
+
+
+public class SolrZkServer {
+ static org.slf4j.Logger log = LoggerFactory.getLogger(SolrZkServer.class);
+
+ String zkRun;
+ String zkHost;
+ String solrHome;
+ String solrPort;
+ Properties props;
+ SolrZkServerProps zkProps;
+
+ private Thread zkThread; // the thread running a zookeeper server, only if zkRun is set
+
+ public SolrZkServer(String zkRun, String zkHost, String solrHome, String solrPort) {
+ this.zkRun = zkRun;
+ this.zkHost = zkHost;
+ this.solrHome = solrHome;
+ this.solrPort = solrPort;
+ }
+
+ public String getClientString() {
+ if (zkHost != null) return zkHost;
+
+ if (zkProps == null) return null;
+
+ // if the string wasn't passed as zkHost, then use the standalone server we started
+ if (zkRun == null) return null;
+ return "localhost:" + zkProps.getClientPortAddress().getPort();
+ }
+
+ public void parseConfig() {
+ if (zkProps == null) {
+ zkProps = new SolrZkServerProps();
+ // set default data dir
+ // TODO: use something based on IP+port??? support ensemble all from same solr home?
+ zkProps.setDataDir(solrHome + '/' + "zoo_data");
+ zkProps.zkRun = zkRun;
+ zkProps.solrPort = solrPort;
+ }
+
+ try {
+ props = SolrZkServerProps.getProperties(solrHome + '/' + "zoo.cfg");
+ SolrZkServerProps.injectServers(props, zkRun, zkHost);
+ zkProps.parseProperties(props);
+ if (zkProps.getClientPortAddress() == null) {
+ zkProps.setClientPort(Integer.parseInt(solrPort)+1000);
+ }
+ } catch (QuorumPeerConfig.ConfigException e) {
+ if (zkRun != null)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (IOException e) {
+ if (zkRun != null)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ public Map<Long, QuorumPeer.QuorumServer> getServers() {
+ return zkProps.getServers();
+ }
+
+ public void start() {
+ if (zkRun == null) return;
+
+ zkThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ if (zkProps.getServers().size() > 1) {
+ QuorumPeerMain zkServer = new QuorumPeerMain();
+ zkServer.runFromConfig(zkProps);
+ } else {
+ ServerConfig sc = new ServerConfig();
+ sc.readFrom(zkProps);
+ ZooKeeperServerMain zkServer = new ZooKeeperServerMain();
+ zkServer.runFromConfig(sc);
+ }
+ log.info("ZooKeeper Server exited.");
+ } catch (Throwable e) {
+ log.error("ZooKeeper Server ERROR", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ };
+
+ if (zkProps.getServers().size() > 1) {
+ log.info("STARTING EMBEDDED ENSEMBLE ZOOKEEPER SERVER at port " + zkProps.getClientPortAddress().getPort());
+ } else {
+ log.info("STARTING EMBEDDED STANDALONE ZOOKEEPER SERVER at port " + zkProps.getClientPortAddress().getPort());
+ }
+
+ zkThread.setDaemon(true);
+ zkThread.start();
+ try {
+ Thread.sleep(500); // pause for ZooKeeper to start
+ } catch (Exception e) {
+ log.error("STARTING ZOOKEEPER", e);
+ }
+ }
+
+ public void stop() {
+ if (zkRun == null) return;
+ zkThread.interrupt();
+ }
+}
+
+
+
+
+// Allows us to set a default for the data dir before parsing
+// zoo.cfg (which validates that there is a dataDir)
+class SolrZkServerProps extends QuorumPeerConfig {
+ protected static org.slf4j.Logger LOG = LoggerFactory.getLogger(QuorumPeerConfig.class);
+
+ String solrPort; // port that Solr is listening on
+ String zkRun;
+
+ /**
+ * Parse a ZooKeeper configuration file
+ * @param path the patch of the configuration file
+ * @throws ConfigException error processing configuration
+ */
+ public static Properties getProperties(String path) throws ConfigException {
+ File configFile = new File(path);
+
+ LOG.info("Reading configuration from: " + configFile);
+
+ try {
+ if (!configFile.exists()) {
+ throw new IllegalArgumentException(configFile.toString()
+ + " file is missing");
+ }
+
+ Properties cfg = new Properties();
+ FileInputStream in = new FileInputStream(configFile);
+ try {
+ cfg.load(in);
+ } finally {
+ in.close();
+ }
+
+ return cfg;
+
+ } catch (IOException e) {
+ throw new ConfigException("Error processing " + path, e);
+ } catch (IllegalArgumentException e) {
+ throw new ConfigException("Error processing " + path, e);
+ }
+ }
+
+
+ // Adds server.x if they don't exist, based on zkHost if it does exist.
+ // Given zkHost=localhost:1111,localhost:2222 this will inject
+ // server.0=localhost:1112:1113
+ // server.1=localhost:2223:2224
+ public static void injectServers(Properties props, String zkRun, String zkHost) {
+
+ // if clientPort not already set, use zkRun
+ if (zkRun != null && props.getProperty("clientPort")==null) {
+ int portIdx = zkRun.lastIndexOf(':');
+ if (portIdx > 0) {
+ String portStr = zkRun.substring(portIdx+1);
+ props.setProperty("clientPort", portStr);
+ }
+ }
+
+ boolean hasServers = hasServers(props);
+
+ if (!hasServers && zkHost != null) {
+ int alg = Integer.parseInt(props.getProperty("electionAlg","3").trim());
+ String[] hosts = zkHost.split(",");
+ int serverNum = 0;
+ for (String hostAndPort : hosts) {
+ hostAndPort = hostAndPort.trim();
+ int portIdx = hostAndPort.lastIndexOf(':');
+ String clientPortStr = hostAndPort.substring(portIdx+1);
+ int clientPort = Integer.parseInt(clientPortStr);
+ String host = hostAndPort.substring(0,portIdx);
+
+ String serverStr = host + ':' + (clientPort+1);
+ // zk leader election algorithms other than 0 need an extra port for leader election.
+ if (alg != 0) {
+ serverStr = serverStr + ':' + (clientPort+2);
+ }
+
+ props.setProperty("server."+serverNum, serverStr);
+ serverNum++;
+ }
+ }
+ }
+
+ public static boolean hasServers(Properties props) {
+ for (Object key : props.keySet())
+ if (((String)key).startsWith("server."))
+ return true;
+ return false;
+ }
+
+ // called by the modified version of parseProperties
+ // when the myid file is missing.
+ public Long getMySeverId() {
+ if (zkRun == null && solrPort == null) return null;
+
+ Map<Long, QuorumPeer.QuorumServer> slist = getServers();
+
+ String myHost = "localhost";
+ InetSocketAddress thisAddr = null;
+
+ if (zkRun != null && zkRun.length()>0) {
+ String parts[] = zkRun.split(":");
+ myHost = parts[0];
+ thisAddr = new InetSocketAddress(myHost, Integer.parseInt(parts[1]) + 1);
+ } else {
+ // default to localhost:<solrPort+1001>
+ thisAddr = new InetSocketAddress(myHost, Integer.parseInt(solrPort)+1001);
+ }
+
+
+ // first try a straight match by host
+ Long me = null;
+ boolean multiple = false;
+ int port = 0;
+ for (QuorumPeer.QuorumServer server : slist.values()) {
+ if (server.addr.getHostName().equals(myHost)) {
+ multiple = me!=null;
+ me = server.id;
+ port = server.addr.getPort();
+ }
+ }
+
+ if (!multiple) {
+ // only one host matched... assume it's me.
+ setClientPort(port - 1);
+ return me;
+ }
+
+ if (me == null) {
+ // no hosts matched.
+ return null;
+ }
+
+
+ // multiple matches... try to figure out by port.
+ for (QuorumPeer.QuorumServer server : slist.values()) {
+ if (server.addr.equals(thisAddr)) {
+ if (clientPortAddress != null || clientPortAddress.getPort() <= 0)
+ setClientPort(server.addr.getPort() - 1);
+ return server.id;
+ }
+ }
+
+ return null;
+ }
+
+
+
+ public void setDataDir(String dataDir) {
+ this.dataDir = dataDir;
+ }
+
+ public void setClientPort(int clientPort) {
+ if (clientPortAddress != null) {
+ try {
+ this.clientPortAddress = new InetSocketAddress(
+ InetAddress.getByName(clientPortAddress.getHostName()), clientPort);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ this.clientPortAddress = new InetSocketAddress(clientPort);
+ }
+ }
+
+
+ // NOTE: copied from ZooKeeper 3.2
+ /**
+ * Parse config from a Properties.
+ * @param zkProp Properties to parse from.
+ * @throws java.io.IOException
+ * @throws ConfigException
+ */
+ public void parseProperties(Properties zkProp)
+ throws IOException, ConfigException {
+ for (Entry<Object, Object> entry : zkProp.entrySet()) {
+ String key = entry.getKey().toString().trim();
+ String value = entry.getValue().toString().trim();
+ if (key.equals("dataDir")) {
+ dataDir = value;
+ } else if (key.equals("dataLogDir")) {
+ dataLogDir = value;
+ } else if (key.equals("clientPort")) {
+ setClientPort(Integer.parseInt(value));
+ } else if (key.equals("tickTime")) {
+ tickTime = Integer.parseInt(value);
+ } else if (key.equals("initLimit")) {
+ initLimit = Integer.parseInt(value);
+ } else if (key.equals("syncLimit")) {
+ syncLimit = Integer.parseInt(value);
+ } else if (key.equals("electionAlg")) {
+ electionAlg = Integer.parseInt(value);
+ } else if (key.equals("maxClientCnxns")) {
+ maxClientCnxns = Integer.parseInt(value);
+ } else if (key.startsWith("server.")) {
+ int dot = key.indexOf('.');
+ long sid = Long.parseLong(key.substring(dot + 1));
+ String parts[] = value.split(":");
+ if ((parts.length != 2) && (parts.length != 3)) {
+ LOG.error(value
+ + " does not have the form host:port or host:port:port");
+ }
+ InetSocketAddress addr = new InetSocketAddress(parts[0],
+ Integer.parseInt(parts[1]));
+ if (parts.length == 2) {
+ servers.put(Long.valueOf(sid), new QuorumPeer.QuorumServer(sid, addr));
+ } else if (parts.length == 3) {
+ InetSocketAddress electionAddr = new InetSocketAddress(
+ parts[0], Integer.parseInt(parts[2]));
+ servers.put(Long.valueOf(sid), new QuorumPeer.QuorumServer(sid, addr,
+ electionAddr));
+ }
+ } else if (key.startsWith("group")) {
+ int dot = key.indexOf('.');
+ long gid = Long.parseLong(key.substring(dot + 1));
+
+ numGroups++;
+
+ String parts[] = value.split(":");
+ for(String s : parts){
+ long sid = Long.parseLong(s);
+ if(serverGroup.containsKey(sid))
+ throw new ConfigException("Server " + sid + "is in multiple groups");
+ else
+ serverGroup.put(sid, gid);
+ }
+
+ } else if(key.startsWith("weight")) {
+ int dot = key.indexOf('.');
+ long sid = Long.parseLong(key.substring(dot + 1));
+ serverWeight.put(sid, Long.parseLong(value));
+ } else {
+ System.setProperty("zookeeper." + key, value);
+ }
+ }
+ if (dataDir == null) {
+ throw new IllegalArgumentException("dataDir is not set");
+ }
+ if (dataLogDir == null) {
+ dataLogDir = dataDir;
+ } else {
+ if (!new File(dataLogDir).isDirectory()) {
+ throw new IllegalArgumentException("dataLogDir " + dataLogDir
+ + " is missing.");
+ }
+ }
+
+ if (tickTime == 0) {
+ throw new IllegalArgumentException("tickTime is not set");
+ }
+ if (servers.size() > 1) {
+ if (initLimit == 0) {
+ throw new IllegalArgumentException("initLimit is not set");
+ }
+ if (syncLimit == 0) {
+ throw new IllegalArgumentException("syncLimit is not set");
+ }
+ /*
+ * If using FLE, then every server requires a separate election
+ * port.
+ */
+ if (electionAlg != 0) {
+ for (QuorumPeer.QuorumServer s : servers.values()) {
+ if (s.electionAddr == null)
+ throw new IllegalArgumentException(
+ "Missing election port for server: " + s.id);
+ }
+ }
+
+ /*
+ * Default of quorum config is majority
+ */
+ if(serverGroup.size() > 0){
+ if(servers.size() != serverGroup.size())
+ throw new ConfigException("Every server must be in exactly one group");
+ /*
+ * The deafult weight of a server is 1
+ */
+ for(QuorumPeer.QuorumServer s : servers.values()){
+ if(!serverWeight.containsKey(s.id))
+ serverWeight.put(s.id, (long) 1);
+ }
+
+ /*
+ * Set the quorumVerifier to be QuorumHierarchical
+ */
+ quorumVerifier = new QuorumHierarchical(numGroups,
+ serverWeight, serverGroup);
+ } else {
+ /*
+ * The default QuorumVerifier is QuorumMaj
+ */
+
+ LOG.info("Defaulting to majority quorums");
+ quorumVerifier = new QuorumMaj(servers.size());
+ }
+
+ File myIdFile = new File(dataDir, "myid");
+ if (!myIdFile.exists()) {
+ ///////////////// ADDED FOR SOLR //////
+ Long myid = getMySeverId();
+ if (myid != null) {
+ serverId = myid;
+ return;
+ }
+ if (zkRun == null) return;
+ //////////////// END ADDED FOR SOLR //////
+ throw new IllegalArgumentException(myIdFile.toString()
+ + " file is missing");
+ }
+
+ BufferedReader br = new BufferedReader(new FileReader(myIdFile));
+ String myIdString;
+ try {
+ myIdString = br.readLine();
+ } finally {
+ br.close();
+ }
+ try {
+ serverId = Long.parseLong(myIdString);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("serverid " + myIdString
+ + " is not a number");
+ }
+ }
+ }
+
+
+}