You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC
svn commit: r1235888 [2/12] - in /lucene/dev/trunk: dev-tools/eclipse/
dev-tools/maven/ solr/ solr/cloud-dev/
solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/
solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,450 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
+import org.apache.solr.cloud.ShardLeaderWatcher.ShardLeaderListener;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.CoreState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkOperation;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cluster leader. Responsible node assignments, cluster state file?
+ */
+public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+
+ public static final String ASSIGNMENTS_NODE = "/node_assignments";
+ public static final String STATES_NODE = "/node_states";
+ private static Logger log = LoggerFactory.getLogger(Overseer.class);
+
+ private final SolrZkClient zkClient;
+ private final ZkStateReader reader;
+
+ // node stateWatches
+ private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
+
+ // shard leader watchers (collection->slice->watcher)
+ private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
+ private ZkCmdExecutor zkCmdExecutor;
+
+ public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
+ log.info("Constructing new Overseer");
+ this.zkClient = zkClient;
+ this.zkCmdExecutor = new ZkCmdExecutor();
+ this.reader = reader;
+ createWatches();
+ }
+
+ public synchronized void createWatches()
+ throws KeeperException, InterruptedException {
+ addCollectionsWatch();
+ addLiveNodesWatch();
+ }
+
+ /*
+ * Watch for collections so we can add watches for its shard leaders.
+ */
+ private void addCollectionsWatch() throws KeeperException,
+ InterruptedException {
+
+ zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
+
+ List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this, true);
+ collectionsChanged(collections);
+ } catch (KeeperException e) {
+ if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("", e);
+ }
+ }
+ }, true);
+
+ collectionsChanged(collections);
+ }
+
+ private void collectionsChanged(Collection<String> collections) throws KeeperException, InterruptedException {
+ synchronized (shardLeaderWatches) {
+ for(String collection: collections) {
+ if(!shardLeaderWatches.containsKey(collection)) {
+ shardLeaderWatches.put(collection, new HashMap<String,ShardLeaderWatcher>());
+ addShardLeadersWatch(collection);
+ }
+ }
+ //XXX not handling delete collections..
+ }
+ }
+
+ /**
+ * Add a watch for node containing shard leaders for a collection
+ * @param collection
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private void addShardLeadersWatch(final String collection) throws KeeperException,
+ InterruptedException {
+
+ zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null), zkClient);
+
+ final List<String> leaderNodes = zkClient.getChildren(
+ ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ List<String> leaderNodes = zkClient.getChildren(
+ ZkStateReader.getShardLeadersPath(collection, null), this, true);
+
+ processLeaderNodesChanged(collection, leaderNodes);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ }
+ }, true);
+
+ processLeaderNodesChanged(collection, leaderNodes);
+ }
+
+ /**
+ * Process change in shard leaders. Make sure we have watches for each leader.
+ */
+ private void processLeaderNodesChanged(final String collection, final Collection<String> shardIds) {
+ if(log.isInfoEnabled()) {
+ log.info("Leader nodes changed for collection: " + collection + " nodes now:" + shardIds);
+ }
+
+ Map<String, ShardLeaderWatcher> watches = shardLeaderWatches.get(collection);
+ Set<String> currentWatches = new HashSet<String>();
+ currentWatches.addAll(watches.keySet());
+
+ Set<String> newLeaders = complement(shardIds, currentWatches);
+
+ Set<String> lostLeaders = complement(currentWatches, shardIds);
+ //remove watches for lost shards
+ for (String shardId : lostLeaders) {
+ ShardLeaderWatcher watcher = watches.remove(shardId);
+ if (watcher != null) {
+ watcher.close();
+ announceLeader(collection, shardId, new ZkCoreNodeProps(new ZkNodeProps())); //removes loeader for shard
+ }
+ }
+
+ //add watches for the new shards
+ for(String shardId: newLeaders) {
+ try {
+ ShardLeaderWatcher watcher = new ShardLeaderWatcher(shardId, collection, zkClient, this);
+ watches.put(shardId, watcher);
+ } catch (KeeperException e) {
+ log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+ }
+ }
+ }
+
+ private void addLiveNodesWatch() throws KeeperException,
+ InterruptedException {
+ List<String> liveNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
+
+ @Override
+ public Object execute() throws KeeperException, InterruptedException {
+ return zkClient.getChildren(
+ ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ List<String> liveNodes = zkClient.getChildren(
+ ZkStateReader.LIVE_NODES_ZKNODE, this, true);
+ Set<String> liveNodesSet = new HashSet<String>();
+ liveNodesSet.addAll(liveNodes);
+ processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ }
+ }
+ }, true);
+ }
+ });
+
+ processLiveNodesChanged(Collections.<String>emptySet(), liveNodes);
+ }
+
+ private void processLiveNodesChanged(Collection<String> oldLiveNodes,
+ Collection<String> liveNodes) throws InterruptedException, KeeperException {
+
+ Set<String> upNodes = complement(liveNodes, oldLiveNodes);
+ if (upNodes.size() > 0) {
+ addNodeStateWatches(upNodes);
+ }
+
+ Set<String> downNodes = complement(oldLiveNodes, liveNodes);
+ for(String node: downNodes) {
+ NodeStateWatcher watcher = nodeStateWatches.remove(node);
+ }
+ }
+
+ private void addNodeStateWatches(Set<String> nodeNames) throws InterruptedException, KeeperException {
+
+ for (String nodeName : nodeNames) {
+ final String path = STATES_NODE + "/" + nodeName;
+ synchronized (nodeStateWatches) {
+ if (!nodeStateWatches.containsKey(nodeName)) {
+ zkCmdExecutor.ensureExists(path, zkClient);
+ nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
+ } else {
+ log.debug("watch already added");
+ }
+ }
+ }
+ }
+
+ /**
+ * Try to assign core to the cluster
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
+ String collection = coreState.getCollectionName();
+ String zkCoreNodeName = coreState.getCoreNodeName();
+
+ String shardId;
+ if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
+ shardId = AssignShard.assignShard(collection, state);
+ } else {
+ shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
+ }
+
+ Map<String,String> props = new HashMap<String,String>();
+ for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ ZkNodeProps zkProps = new ZkNodeProps(props);
+ Slice slice = state.getSlice(collection, shardId);
+ Map<String,ZkNodeProps> shardProps;
+ if (slice == null) {
+ shardProps = new HashMap<String,ZkNodeProps>();
+ } else {
+ shardProps = state.getSlice(collection, shardId).getShardsCopy();
+ }
+ shardProps.put(zkCoreNodeName, zkProps);
+
+ slice = new Slice(shardId, shardProps);
+ CloudState newCloudState = updateSlice(state, collection, slice);
+ return newCloudState;
+ }
+
+ private Set<String> complement(Collection<String> next,
+ Collection<String> prev) {
+ Set<String> downCollections = new HashSet<String>();
+ downCollections.addAll(next);
+ downCollections.removeAll(prev);
+ return downCollections;
+ }
+
+ @Override
+ public void coreChanged(final String nodeName, final Set<CoreState> states) throws KeeperException, InterruptedException {
+ log.debug("Cores changed: " + nodeName + " states:" + states);
+ synchronized(reader.getUpdateLock()) {
+ reader.updateCloudState(true);
+ CloudState cloudState = reader.getCloudState();
+ for (CoreState state : states) {
+ cloudState = updateState(cloudState, nodeName, state);
+ }
+
+ try {
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(cloudState), true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Interrupted while publishing new state", e);
+ }
+ }
+ }
+
+ public static void createClientNodes(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
+ final String node = STATES_NODE + "/" + nodeName;
+ if (log.isInfoEnabled()) {
+ log.info("creating node:" + node);
+ }
+
+ ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
+ zkCmdExecutor.ensureExists(node, zkClient);
+ }
+
+ private CloudState updateSlice(CloudState state, String collection, Slice slice) {
+
+ final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+ newStates.putAll(state.getCollectionStates());
+
+ if (!newStates.containsKey(collection)) {
+ newStates.put(collection, new LinkedHashMap<String,Slice>());
+ }
+
+ final Map<String, Slice> slices = newStates.get(collection);
+ if (!slices.containsKey(slice.getName())) {
+ slices.put(slice.getName(), slice);
+ } else {
+ final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
+ final Slice existingSlice = slices.get(slice.getName());
+ shards.putAll(existingSlice.getShards());
+ //XXX preserve existing leader
+ for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
+ if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
+ HashMap<String, String> newProps = new HashMap<String,String>();
+ newProps.putAll(edit.getValue().getProperties());
+ newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
+ shards.put(edit.getKey(), new ZkNodeProps(newProps));
+ } else {
+ shards.put(edit.getKey(), edit.getValue());
+ }
+ }
+ final Slice updatedSlice = new Slice(slice.getName(), shards);
+ slices.put(slice.getName(), updatedSlice);
+ }
+ return new CloudState(state.getLiveNodes(), newStates);
+ }
+
+ private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
+
+ boolean updated = false;
+ final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+ newStates.putAll(state.getCollectionStates());
+
+ final Map<String, Slice> slices = newStates.get(collection);
+
+ if(slices==null) {
+ log.error("Could not mark shard leader for non existing collection.");
+ return state;
+ }
+
+ if (!slices.containsKey(sliceName)) {
+ log.error("Could not mark leader for non existing slice.");
+ return state;
+ } else {
+ final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
+ for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
+ Map<String, String> newShardProps = new LinkedHashMap<String,String>();
+ newShardProps.putAll(shard.getValue().getProperties());
+
+ String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP); //clean any previously existed flag
+
+ ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
+ if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
+ newShardProps.put(ZkStateReader.LEADER_PROP,"true");
+ if (wasLeader == null) {
+ updated = true;
+ }
+ } else {
+ if (wasLeader != null) {
+ updated = true;
+ }
+ }
+ newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
+ }
+ Slice slice = new Slice(sliceName, newShards);
+ slices.put(sliceName, slice);
+ }
+ if (updated) {
+ return new CloudState(state.getLiveNodes(), newStates);
+ } else {
+ return state;
+ }
+ }
+
+ @Override
+ public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
+ synchronized (reader.getUpdateLock()) {
+ try {
+ reader.updateCloudState(true); // get fresh copy of the state
+ final CloudState state = reader.getCloudState();
+ final CloudState newState = setShardLeader(state, collection, shardId,
+ props.getCoreUrl());
+ if (state != newState) { // if same instance was returned no need to
+ // update state
+ log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
+ zkClient.setData(ZkStateReader.CLUSTER_STATE,
+ ZkStateReader.toJSON(newState), true);
+
+ } else {
+ log.debug("State was not changed.");
+ }
+ } catch (KeeperException e) {
+ log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+ }
+ }
+ }
+
+}
\ No newline at end of file
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,272 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.UpdateLog.RecoveryInfo;
+import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecoveryStrategy extends Thread {
+ private static final int MAX_RETRIES = 100;
+ private static final int INTERRUPTED = 101;
+ private static final int START_TIMEOUT = 100;
+
+ private static final String REPLICATION_HANDLER = "/replication";
+
+ private static Logger log = LoggerFactory.getLogger(RecoveryStrategy.class);
+
+ private volatile boolean close = false;
+
+ private ZkController zkController;
+ private String baseUrl;
+ private String coreZkNodeName;
+ private ZkStateReader zkStateReader;
+ private volatile String coreName;
+ private int retries;
+ private SolrCore core;
+
+ public RecoveryStrategy(SolrCore core) {
+ this.core = core;
+ this.coreName = core.getName();
+
+ zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+ zkStateReader = zkController.getZkStateReader();
+ baseUrl = zkController.getBaseUrl();
+ coreZkNodeName = zkController.getNodeName() + "_" + coreName;
+
+ }
+
+ // make sure any threads stop retrying
+ public void close() {
+ close = true;
+ interrupt();
+ }
+
+
+ private void recoveryFailed(final SolrCore core,
+ final ZkController zkController, final String baseUrl,
+ final String shardZkNodeName, final CoreDescriptor cd) {
+ SolrException.log(log, "Recovery failed - I give up.");
+ zkController.publishAsRecoveryFailed(baseUrl, cd,
+ shardZkNodeName, core.getName());
+ close = true;
+ }
+
+ private void replicate(String nodeName, SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
+ throws SolrServerException, IOException {
+ // start buffer updates to tran log
+ // and do recovery - either replay via realtime get (eventually)
+ // or full index replication
+
+ String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
+ ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
+ String leaderUrl = leaderCNodeProps.getCoreUrl();
+ String leaderCoreName = leaderCNodeProps.getCoreName();
+
+ log.info("Attempt to replicate from " + leaderUrl);
+
+ // if we are the leader, either we are trying to recover faster
+ // then our ephemeral timed out or we are the only node
+ if (!leaderBaseUrl.equals(baseUrl)) {
+
+ CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl);
+ server.setSoTimeout(15000);
+ PrepRecovery prepCmd = new PrepRecovery();
+ prepCmd.setCoreName(leaderCoreName);
+ prepCmd.setNodeName(nodeName);
+ prepCmd.setCoreNodeName(shardZkNodeName);
+
+ server.request(prepCmd);
+ server.shutdown();
+
+ // use rep handler directly, so we can do this sync rather than async
+ SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
+ if (handler instanceof LazyRequestHandlerWrapper) {
+ handler = ((LazyRequestHandlerWrapper)handler).getWrappedHandler();
+ }
+ ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+
+ if (replicationHandler == null) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+ "Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
+ }
+
+ ModifiableSolrParams solrParams = new ModifiableSolrParams();
+ solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
+
+ if (close) retries = INTERRUPTED;
+ boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure fore=true does not download files we already have
+
+ if (!success) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
+ }
+
+ // solrcloud_debug
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replicated "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " from " + leaderUrl + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir());
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
+ }
+ }
+
+ @Override
+ public void run() {
+ boolean replayed = false;
+ boolean succesfulRecovery = false;
+
+ while (!succesfulRecovery && !close && !isInterrupted()) {
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog == null) return;
+
+ ulog.bufferUpdates();
+ replayed = false;
+ CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+ try {
+ zkController.publish(core, ZkStateReader.RECOVERING);
+
+ ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
+ cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+ // System.out.println("recover " + shardZkNodeName + " against " +
+ // leaderprops);
+ replicate(zkController.getNodeName(), core, coreZkNodeName,
+ leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, coreName));
+
+ replay(ulog);
+ replayed = true;
+
+ // if there are pending recovery requests, don't advert as active
+ zkController.publishAsActive(baseUrl, core.getCoreDescriptor(), coreZkNodeName,
+ coreName);
+
+ succesfulRecovery = true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Recovery was interrupted", e);
+ retries = INTERRUPTED;
+ } catch (Throwable t) {
+ SolrException.log(log, "Error while trying to recover", t);
+ } finally {
+ if (!replayed) {
+ try {
+ ulog.dropBufferedUpdates();
+ } catch (Throwable t) {
+ SolrException.log(log, "", t);
+ }
+ }
+
+ }
+
+ if (!succesfulRecovery) {
+ // lets pause for a moment and we need to try again...
+ // TODO: we don't want to retry for some problems?
+ // Or do a fall off retry...
+ try {
+
+ SolrException.log(log, "Recovery failed - trying again...");
+ retries++;
+ if (retries >= MAX_RETRIES) {
+ if (retries == INTERRUPTED) {
+
+ } else {
+ // TODO: for now, give up after 10 tries - should we do more?
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+ core.getCoreDescriptor());
+ }
+ break;
+ }
+
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ }
+
+ try {
+ Thread.sleep(Math.min(START_TIMEOUT * retries, 60000));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Recovery was interrupted", e);
+ retries = INTERRUPTED;
+ }
+ }
+
+ log.info("Finished recovery process");
+
+ }
+ }
+
+ private Future<RecoveryInfo> replay(UpdateLog ulog)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ Future<RecoveryInfo> future = ulog.applyBufferedUpdates();
+ if (future == null) {
+ // no replay needed\
+ log.info("No replay needed");
+ } else {
+ // wait for replay
+ future.get();
+ }
+
+ // solrcloud_debug
+// try {
+// RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+// SolrIndexSearcher searcher = searchHolder.get();
+// try {
+// System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replayed "
+// + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+// } finally {
+// searchHolder.decref();
+// }
+// } catch (Exception e) {
+//
+// }
+
+ return future;
+ }
+
+}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,90 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A watcher for shard leader.
+ */
+public class ShardLeaderWatcher implements Watcher {
+ private static Logger logger = LoggerFactory.getLogger(ShardLeaderWatcher.class);
+ static interface ShardLeaderListener {
+ void announceLeader(String collection, String shardId, ZkCoreNodeProps props);
+ }
+
+ private final String shard;
+ private final String collection;
+ private final String path;
+ private final SolrZkClient zkClient;
+ private volatile boolean closed = false;
+ private final ShardLeaderListener listener;
+
+ public ShardLeaderWatcher(String shard, String collection,
+ SolrZkClient zkClient, ShardLeaderListener listener) throws KeeperException, InterruptedException {
+ this.shard = shard;
+ this.collection = collection;
+ this.path = ZkStateReader.getShardLeadersPath(collection, shard);
+ this.zkClient = zkClient;
+ this.listener = listener;
+ processLeaderChange();
+ }
+
+ private void processLeaderChange() throws KeeperException, InterruptedException {
+ if(closed) return;
+ try {
+ byte[] data = zkClient.getData(path, this, null, true);
+ if (data != null) {
+ final ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(ZkNodeProps.load(data));
+ listener.announceLeader(collection, shard, leaderProps);
+ }
+ } catch (KeeperException ke) {
+ //check if we lost connection or the node was gone
+ if (ke.code() != Code.CONNECTIONLOSS && ke.code() != Code.SESSIONEXPIRED
+ && ke.code() != Code.NONODE) {
+ throw ke;
+ }
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ try {
+ processLeaderChange();
+ } catch (KeeperException e) {
+ logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+ }
+ }
+
+ public void close() {
+ closed = true;
+ }
+
+}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,227 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.PeerSync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncStrategy {
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ public boolean sync(ZkController zkController, SolrCore core,
+ ZkNodeProps leaderProps) {
+ zkController.publish(core, ZkStateReader.SYNC);
+
+ // solrcloud_debug
+ // System.out.println("SYNC UP");
+ boolean success = syncReplicas(zkController, core, leaderProps);
+ return success;
+ }
+
+ private boolean syncReplicas(ZkController zkController, SolrCore core,
+ ZkNodeProps leaderProps) {
+ boolean success = false;
+ CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+ String collection = cloudDesc.getCollectionName();
+ String shardId = cloudDesc.getShardId();
+
+ // first sync ourselves - we are the potential leader after all
+ try {
+ success = syncWithReplicas(zkController, core, leaderProps, collection,
+ shardId);
+ } catch (Exception e) {
+ SolrException.log(log, "Sync Failed", e);
+ }
+ try {
+ // if !success but no one else is in active mode,
+ // we are the leader anyway
+ // TODO: should we also be leader if there is only one other active?
+ // if we couldn't sync with it, it shouldn't be able to sync with us
+ if (!success
+ && !areAnyOtherReplicasActive(zkController, leaderProps, collection,
+ shardId)) {
+// System.out
+// .println("wasnt a success but no on else i active! I am the leader");
+
+ success = true;
+ }
+
+ if (success) {
+ // solrcloud_debug
+ // System.out.println("Sync success");
+ // we are the leader - tell all of our replias to sync with us
+
+ syncToMe(zkController, collection, shardId, leaderProps);
+
+ } else {
+
+ // solrcloud_debug
+ // System.out.println("Sync failure");
+ }
+
+ } catch (Exception e) {
+ SolrException.log(log, "Sync Failed", e);
+ }
+
+ return success;
+ }
+
+ private boolean areAnyOtherReplicasActive(ZkController zkController,
+ ZkNodeProps leaderProps, String collection, String shardId) {
+ CloudState cloudState = zkController.getZkStateReader().getCloudState();
+ Map<String,Slice> slices = cloudState.getSlices(collection);
+ Slice slice = slices.get(shardId);
+ Map<String,ZkNodeProps> shards = slice.getShards();
+ for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
+ String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+// System.out.println("state:"
+// + state
+// + shard.getValue().get(ZkStateReader.NODE_NAME_PROP)
+// + " live: "
+// + cloudState.liveNodesContain(shard.getValue().get(
+// ZkStateReader.NODE_NAME_PROP)));
+ if ((state.equals(ZkStateReader.ACTIVE))
+ && cloudState.liveNodesContain(shard.getValue().get(
+ ZkStateReader.NODE_NAME_PROP))
+ && !new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
+ new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private boolean syncWithReplicas(ZkController zkController, SolrCore core,
+ ZkNodeProps props, String collection, String shardId)
+ throws MalformedURLException, SolrServerException, IOException {
+ List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
+ .getReplicaProps(collection, shardId,
+ props.get(ZkStateReader.NODE_NAME_PROP),
+ props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE); // TODO:
+ // should
+ // there
+ // be a
+ // state
+ // filter?
+
+ if (nodes == null) {
+ // I have no replicas
+ return true;
+ }
+
+ List<String> syncWith = new ArrayList<String>();
+ for (ZkCoreNodeProps node : nodes) {
+ // if we see a leader, must be stale state, and this is the guy that went down
+ if (!node.getNodeProps().keySet().contains(ZkStateReader.LEADER_PROP)) {
+ syncWith.add(node.getCoreUrl());
+ }
+ }
+
+
+ PeerSync peerSync = new PeerSync(core, syncWith, 1000);
+ return peerSync.sync();
+ }
+
+ private void syncToMe(ZkController zkController, String collection,
+ String shardId, ZkNodeProps leaderProps) throws MalformedURLException,
+ SolrServerException, IOException {
+
+ // sync everyone else
+ // TODO: we should do this in parallel at least
+ List<ZkCoreNodeProps> nodes = zkController
+ .getZkStateReader()
+ .getReplicaProps(collection, shardId,
+ leaderProps.get(ZkStateReader.NODE_NAME_PROP),
+ leaderProps.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE);
+ if (nodes == null) {
+ // System.out.println("I have no replicas");
+ // I have no replicas
+ return;
+ }
+ //System.out.println("tell my replicas to sync");
+ ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
+ for (ZkCoreNodeProps node : nodes) {
+ try {
+ // TODO: do we first everyone register as sync phase? get the overseer
+ // to do it?
+ // TODO: this should be done in parallel
+ QueryRequest qr = new QueryRequest(params("qt", "/get", "getVersions",
+ Integer.toString(1000), "sync", zkLeader.getCoreUrl(), "distrib",
+ "false"));
+ CommonsHttpSolrServer server = new CommonsHttpSolrServer(
+ node.getCoreUrl());
+ //System.out.println("ask " + node.getCoreUrl() + " to sync");
+ NamedList rsp = server.request(qr);
+ //System.out.println("response about syncing to leader:" + rsp + " node:"
+ // + node.getCoreUrl() + " me:" + zkController.getBaseUrl());
+ boolean success = (Boolean) rsp.get("sync");
+ //System.out.println("success:" + success);
+ if (!success) {
+ // System.out
+ // .println("try and ask " + node.getCoreUrl() + " to recover");
+ log.info("try and ask " + node.getCoreUrl() + " to recover");
+ try {
+ server = new CommonsHttpSolrServer(node.getBaseUrl());
+ server.setSoTimeout(5000);
+ server.setConnectionTimeout(5000);
+
+ RequestRecovery recoverRequestCmd = new RequestRecovery();
+ recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
+ recoverRequestCmd.setCoreName(node.getCoreName());
+
+ server.request(recoverRequestCmd);
+ } catch (Exception e) {
+ log.info("Could not tell a replica to recover", e);
+ }
+ }
+ } catch (Exception e) {
+ SolrException.log(log, "Error syncing replica to leader", e);
+ }
+ }
+ }
+
+ public static ModifiableSolrParams params(String... params) {
+ ModifiableSolrParams msp = new ModifiableSolrParams();
+ for (int i = 0; i < params.length; i += 2) {
+ msp.add(params[i], params[i + 1]);
+ }
+ return msp;
+ }
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Jan 25 19:49:26 2012
@@ -20,24 +20,36 @@ package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.CoreState;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +73,7 @@ public final class ZkController {
private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
-
+
// package private for tests
static final String CONFIGS_ZKNODE = "/configs";
@@ -69,10 +81,14 @@ public final class ZkController {
public final static String COLLECTION_PARAM_PREFIX="collection.";
public final static String CONFIGNAME_PROP="configName";
- private SolrZkClient zkClient;
+ private final Map<String, CoreState> coreStates = Collections.synchronizedMap(new HashMap<String, CoreState>());
+ private SolrZkClient zkClient;
+ private ZkCmdExecutor cmdExecutor;
private ZkStateReader zkStateReader;
+ private LeaderElector leaderElector;
+
private String zkServerAddress;
private String localHostPort;
@@ -82,20 +98,61 @@ public final class ZkController {
private String hostName;
+ private LeaderElector overseerElector;
+
+ private boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
+
+ // this can be null in which case recovery will be inactive
+ private CoreContainer cc;
+
+ public static void main(String[] args) throws Exception {
+ // start up a tmp zk server first
+ String zkServerAddress = args[0];
+
+ String solrHome = args[1];
+ String solrPort = args[2];
+
+ String confDir = args[3];
+ String confName = args[4];
+
+ SolrZkServer zkServer = new SolrZkServer("true", null, solrHome, solrPort);
+ zkServer.parseConfig();
+ zkServer.start();
+
+ SolrZkClient zkClient = new SolrZkClient(zkServerAddress, 15000, 5000,
+ new OnReconnect() {
+ @Override
+ public void command() {
+ }});
+
+ uploadConfigDir(zkClient, new File(confDir), confName);
+
+ zkServer.stop();
+ }
+
+
/**
- * @param zkServerAddress ZooKeeper server host address
+ * @param coreContainer if null, recovery will not be enabled
+ * @param zkServerAddress
* @param zkClientTimeout
* @param zkClientConnectTimeout
* @param localHost
* @param locaHostPort
* @param localHostContext
+ * @param numShards
* @throws InterruptedException
* @throws TimeoutException
* @throws IOException
*/
- public ZkController(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
- String localHostContext) throws InterruptedException,
+ public ZkController(CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
+ String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException {
+ this.cc = cc;
+ if (localHostContext.contains("/")) {
+ throw new IllegalArgumentException("localHostContext ("
+ + localHostContext + ") should not contain a /");
+ }
+
this.zkServerAddress = zkServerAddress;
this.localHostPort = locaHostPort;
this.localHostContext = localHostContext;
@@ -107,68 +164,61 @@ public final class ZkController {
public void command() {
try {
- zkStateReader.makeCollectionsNodeWatches();
- zkStateReader.makeShardsWatches(true);
+ // we need to create all of our lost watches
+
+ // seems we dont need to do this again...
+ //Overseer.createClientNodes(zkClient, getNodeName());
+
+ ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+ overseerElector.joinElection(context);
+ zkStateReader.createClusterStateWatchersAndUpdate();
+
+ List<CoreDescriptor> descriptors = registerOnReconnect
+ .getCurrentDescriptors();
+ if (descriptors != null) {
+ // before registering as live, make sure everyone is in a
+ // recovery state
+ for (CoreDescriptor descriptor : descriptors) {
+ final String shardZkNodeName = getNodeName() + "_"
+ + descriptor.getName();
+ publishAsDown(getBaseUrl(), descriptor, shardZkNodeName,
+ descriptor.getName());
+ }
+ }
+
+ // we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();
- zkStateReader.updateCloudState(false);
- } catch (KeeperException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
+
+ // re register all descriptors
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ // TODO: we need to think carefully about what happens when it was
+ // a leader that was expired - as well as what to do about leaders/overseers
+ // with connection loss
+ register(descriptor.getName(), descriptor, true);
+ }
+ }
+
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
});
+ cmdExecutor = new ZkCmdExecutor();
+ leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient);
init();
}
/**
- * @param shardId
- * @param collection
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
- */
- private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
-
- String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
-
- try {
-
- // shards node
- if (!zkClient.exists(shardsZkPath)) {
- if (log.isInfoEnabled()) {
- log.info("creating zk shards node:" + shardsZkPath);
- }
- // makes shards zkNode if it doesn't exist
- zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
-
- // TODO: consider how these notifications are being done
- // ping that there is a new shardId
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-
- }
- } catch (KeeperException e) {
- // its okay if another beats us creating the node
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- throw e;
- }
- }
-
- }
-
- /**
* Closes the underlying ZooKeeper client.
*/
public void close() {
@@ -177,7 +227,7 @@ public final class ZkController {
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
- log.error("", e);
+ log.warn("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
@@ -192,7 +242,7 @@ public final class ZkController {
*/
public boolean configFileExists(String collection, String fileName)
throws KeeperException, InterruptedException {
- Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null);
+ Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null, true);
return stat != null;
}
@@ -213,7 +263,7 @@ public final class ZkController {
public byte[] getConfigFileData(String zkConfigName, String fileName)
throws KeeperException, InterruptedException {
String zkPath = CONFIGS_ZKNODE + "/" + zkConfigName + "/" + fileName;
- byte[] bytes = zkClient.getData(zkPath, null, null);
+ byte[] bytes = zkClient.getData(zkPath, null, null, true);
if (bytes == null) {
log.error("Config file contains no data:" + zkPath);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -271,20 +321,17 @@ public final class ZkController {
}
// makes nodes zkNode
- try {
- zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
- } catch (KeeperException e) {
- // its okay if another beats us creating the node
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- }
+ cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
+ Overseer.createClientNodes(zkClient, getNodeName());
createEphemeralLiveNode();
- setUpCollectionsNode();
- zkStateReader.makeCollectionsNodeWatches();
+ cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
+
+ overseerElector = new LeaderElector(zkClient);
+ ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+ overseerElector.setup(context);
+ overseerElector.joinElection(context);
+ zkStateReader.createClusterStateWatchersAndUpdate();
} catch (IOException e) {
log.error("", e);
@@ -303,53 +350,17 @@ public final class ZkController {
}
}
+
+ public boolean isConnected() {
+ return zkClient.isConnected();
+ }
private void createEphemeralLiveNode() throws KeeperException,
InterruptedException {
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
log.info("Register node as live in ZooKeeper:" + nodePath);
- Watcher liveNodeWatcher = new Watcher() {
-
- public void process(WatchedEvent event) {
- try {
- log.info("Updating live nodes:" + zkClient);
- try {
- zkStateReader.updateLiveNodes();
- } finally {
- // re-make watch
-
- String path = event.getPath();
- if(path == null) {
- // on shutdown, it appears this can trigger with a null path
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- zkClient.getChildren(event.getPath(), this);
- }
- } catch (KeeperException e) {
- if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
- return;
- }
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
-
- }
-
- };
+
try {
boolean nodeDeleted = true;
try {
@@ -358,7 +369,7 @@ public final class ZkController {
// until expiration timeout - so a node won't be created here because
// it exists, but eventually the node will be removed. So delete
// in case it exists and create a new node.
- zkClient.delete(nodePath, -1);
+ zkClient.delete(nodePath, -1, true);
} catch (KeeperException.NoNodeException e) {
// fine if there is nothing to delete
// TODO: annoying that ZK logs a warning on us
@@ -369,25 +380,17 @@ public final class ZkController {
.info("Found a previous node that still exists while trying to register a new live node "
+ nodePath + " - removing existing node to create another.");
}
- zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+ zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
throw e;
}
- }
- zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
- try {
- zkStateReader.updateLiveNodes();
- } catch (IOException e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
+ }
}
public String getNodeName() {
- return hostName + ":" + localHostPort + "_"+ localHostContext;
+ return hostName + ":" + localHostPort + "_" + localHostContext;
}
/**
@@ -398,7 +401,7 @@ public final class ZkController {
*/
public boolean pathExists(String path) throws KeeperException,
InterruptedException {
- return zkClient.exists(path);
+ return zkClient.exists(path, true);
}
/**
@@ -417,15 +420,14 @@ public final class ZkController {
if (log.isInfoEnabled()) {
log.info("Load collection config from:" + path);
}
- byte[] data = zkClient.getData(path, null, null);
- ZkNodeProps props = new ZkNodeProps();
+ byte[] data = zkClient.getData(path, null, null, true);
if(data != null) {
- props.load(data);
+ ZkNodeProps props = ZkNodeProps.load(data);
configName = props.get(CONFIGNAME_PROP);
}
- if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName)) {
+ if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
log.error("Specified config does not exist in ZooKeeper:" + configName);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"Specified config does not exist in ZooKeeper:" + configName);
@@ -434,67 +436,224 @@ public final class ZkController {
return configName;
}
+
/**
* Register shard with ZooKeeper.
*
* @param coreName
* @param cloudDesc
- * @param forcePropsUpdate update solr.xml core props even if the shard is already registered
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
+ * @return
+ * @throws Exception
+ */
+ public String register(String coreName, final CoreDescriptor desc) throws Exception {
+ return register(coreName, desc, false);
+ }
+
+
+ /**
+ * Register shard with ZooKeeper.
+ *
+ * @param coreName
+ * @param desc
+ * @param recoverReloadedCores
+ * @return
+ * @throws Exception
*/
- public void register(String coreName, CloudDescriptor cloudDesc, boolean forcePropsUpdate) throws IOException,
- KeeperException, InterruptedException {
- String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
- + "/" + coreName;
+ public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores) throws Exception {
+ final String baseUrl = getBaseUrl();
- String collection = cloudDesc.getCollectionName();
+ final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+ final String collection = cloudDesc.getCollectionName();
- String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
- boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
-
- if(shardZkNodeAlreadyExists && !forcePropsUpdate) {
- return;
- }
+ log.info("Attempting to update " + ZkStateReader.CLUSTER_STATE + " version "
+ + null);
+ CloudState state = CloudState.load(zkClient, zkStateReader.getCloudState().getLiveNodes());
+
+ final String coreZkNodeName = getNodeName() + "_" + coreName;
+ String shardId = cloudDesc.getShardId();
+
+ Map<String,String> props = new HashMap<String,String>();
+ props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+ props.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
+ props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+
if (log.isInfoEnabled()) {
- log.info("Register shard - core:" + coreName + " address:"
- + shardUrl);
+ log.info("Register shard - core:" + coreName + " address:"
+ + baseUrl + " shardId:" + shardId);
}
- ZkNodeProps props = new ZkNodeProps();
- props.put(ZkStateReader.URL_PROP, shardUrl);
+ // we only put a subset of props into the leader node
+ ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
+ props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
+ props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.NODE_NAME_PROP,
+ props.get(ZkStateReader.NODE_NAME_PROP));
- props.put(ZkStateReader.NODE_NAME, getNodeName());
- byte[] bytes = props.store();
+ joinElection(collection, coreZkNodeName, shardId, leaderProps);
+
+ String leaderUrl = zkStateReader.getLeaderUrl(collection,
+ cloudDesc.getShardId(), 30000);
+
+ String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+ log.info("We are " + ourUrl + " and leader is " + leaderUrl);
+ boolean isLeader = leaderUrl.equals(ourUrl);
- String shardZkNodeName = getNodeName() + "_" + coreName;
- if(shardZkNodeAlreadyExists && forcePropsUpdate) {
- zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
- // tell everyone to update cloud info
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
- } else {
- addZkShardsNode(cloudDesc.getShardId(), collection);
+ SolrCore core = null;
+ if (cc != null) { // CoreContainer only null in tests
try {
- zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
- CreateMode.PERSISTENT);
- // tell everyone to update cloud info
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
- } catch (KeeperException e) {
- // its okay if the node already exists
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- throw e;
+ core = cc.getCore(desc.getName());
+
+ boolean startRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+ collection, coreZkNodeName, shardId, leaderProps, core, cc);
+ if (!startRecovery) {
+ publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+ }
+ } finally {
+ if (core != null) {
+ core.close();
+ }
+ }
+ } else {
+ publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+ }
+
+ // make sure we have an update cluster state right away
+ zkStateReader.updateCloudState(true);
+
+ return shardId;
+ }
+
+
+ private void joinElection(final String collection,
+ final String shardZkNodeName, String shardId, ZkNodeProps leaderProps) throws InterruptedException, KeeperException, IOException {
+ ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
+ collection, shardZkNodeName, leaderProps, this, cc);
+
+ leaderElector.setup(context);
+ leaderElector.joinElection(context);
+ }
+
+
+ private boolean checkRecovery(String coreName, final CoreDescriptor desc,
+ boolean recoverReloadedCores, final boolean isLeader,
+ final CloudDescriptor cloudDesc, final String collection,
+ final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
+ SolrCore core, CoreContainer cc) throws InterruptedException,
+ KeeperException, IOException, ExecutionException {
+
+
+ boolean doRecovery = true;
+
+
+ if (isLeader) {
+ doRecovery = false;
+
+ // recover from local transaction log and wait for it to complete before
+ // going active
+ // TODO: should this be moved to another thread? To recoveryStrat?
+ // TODO: should this actually be done earlier, before (or as part of)
+ // leader election perhaps?
+ // TODO: ensure that a replica that is trying to recover waits until I'm
+ // active (or don't make me the
+ // leader until my local replay is done. But this replay is only needed
+ // on the leader - replicas
+ // will do recovery anyway
+
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (!core.isReloaded() && ulog != null) {
+ Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+ .getUpdateLog().recoverFromLog();
+ if (recoveryFuture != null) {
+ recoveryFuture.get(); // NOTE: this could potentially block for
+ // minutes or more!
+ // TODO: public as recovering in the mean time?
}
- // for some reason the shard already exists, though it didn't when we
- // started registration - just return
- return;
}
+ return false;
+ } else {
+
+ if (core.isReloaded() && !recoverReloadedCores) {
+ doRecovery = false;
+ }
+ }
+
+ if (doRecovery && !SKIP_AUTO_RECOVERY) {
+ log.info("Core needs to recover:" + core.getName());
+ core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+ return true;
}
+
+ return false;
+ }
+
+
+ public String getBaseUrl() {
+ final String baseUrl = localHostName + ":" + localHostPort + "/" + localHostContext;
+ return baseUrl;
+ }
+
+ void publishAsActive(String shardUrl,
+ final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+ Map<String,String> finalProps = new HashMap<String,String>();
+ finalProps.put(ZkStateReader.BASE_URL_PROP, shardUrl);
+ finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
+
+ publishState(cd, shardZkNodeName, coreName, finalProps);
+ }
+
+ public void publish(SolrCore core, String state) {
+ CoreDescriptor cd = core.getCoreDescriptor();
+ Map<String,String> finalProps = new HashMap<String,String>();
+ finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
+ finalProps.put(ZkStateReader.CORE_NAME_PROP, core.getName());
+ finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ finalProps.put(ZkStateReader.STATE_PROP, state);
+ publishState(cd, getNodeName() + "_" + core.getName(),
+ core.getName(), finalProps);
+ }
+
+ void publishAsDown(String baseUrl,
+ final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+ Map<String,String> finalProps = new HashMap<String,String>();
+ finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+ finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+
+ publishState(cd, shardZkNodeName, coreName, finalProps);
+ }
+
+ void publishAsRecoveryFailed(String baseUrl,
+ final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+ Map<String,String> finalProps = new HashMap<String,String>();
+ finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+ finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+ finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERY_FAILED);
+ publishState(cd, shardZkNodeName, coreName, finalProps);
+ }
+
+
+ private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
+ final CloudState state, final String shardZkNodeName) {
+
+ final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+
+ final String shardId = state.getShardId(shardZkNodeName);
+
+ if (shardId != null) {
+ cloudDesc.setShardId(shardId);
+ return false;
+ }
+ return true;
}
/**
@@ -513,16 +672,7 @@ public final class ZkController {
* @throws InterruptedException
*/
public void uploadToZK(File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
- File[] files = dir.listFiles();
- for(File file : files) {
- if (!file.getName().startsWith(".")) {
- if (!file.isDirectory()) {
- zkClient.setData(zkPath + "/" + file.getName(), file);
- } else {
- uploadToZK(file, zkPath + "/" + file.getName());
- }
- }
- }
+ uploadToZK(zkClient, dir, zkPath);
}
/**
@@ -533,7 +683,7 @@ public final class ZkController {
* @throws InterruptedException
*/
public void uploadConfigDir(File dir, String configName) throws IOException, KeeperException, InterruptedException {
- uploadToZK(dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+ uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
}
// convenience for testing
@@ -541,32 +691,6 @@ public final class ZkController {
zkClient.printLayoutToStdOut();
}
- private void setUpCollectionsNode() throws KeeperException, InterruptedException {
- try {
- if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
- if (log.isInfoEnabled()) {
- log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
- }
- // makes collections zkNode if it doesn't exist
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
- }
- } catch (KeeperException e) {
- // its okay if another beats us creating the node
- if (e.code() != KeeperException.Code.NODEEXISTS) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
- "", e);
- }
-
- }
-
public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
String collection = cd.getCollectionName();
@@ -574,12 +698,12 @@ public final class ZkController {
String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
try {
- if(!zkClient.exists(collectionPath)) {
+ if(!zkClient.exists(collectionPath, true)) {
log.info("Creating collection in ZooKeeper:" + collection);
SolrParams params = cd.getParams();
try {
- ZkNodeProps collectionProps = new ZkNodeProps();
+ Map<String,String> collectionProps = new HashMap<String,String>();
// TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, "configuration1");
@@ -595,7 +719,7 @@ public final class ZkController {
// if the config name wasn't passed in, use the default
if (!collectionProps.containsKey(CONFIGNAME_PROP))
- collectionProps.put(CONFIGNAME_PROP, defaultConfigName);
+ getConfName(collection, collectionPath, collectionProps);
} else if(System.getProperty("bootstrap_confdir") != null) {
// if we are bootstrapping a collection, default the config for
@@ -614,32 +738,14 @@ public final class ZkController {
collectionProps.put(CONFIGNAME_PROP, defaultConfigName);
} else {
- // check for configName
- log.info("Looking for collection configName");
- int retry = 1;
- for (; retry < 6; retry++) {
- if (zkClient.exists(collectionPath)) {
- collectionProps = new ZkNodeProps();
- collectionProps.load(zkClient.getData(collectionPath, null, null));
- if (collectionProps.containsKey(CONFIGNAME_PROP)) {
- break;
- }
- }
- log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
- Thread.sleep(2000);
- }
- if (retry == 6) {
- log.error("Could not find configName for collection " + collection);
- throw new ZooKeeperException(
- SolrException.ErrorCode.SERVER_ERROR,
- "Could not find configName for collection " + collection);
- }
+ getConfName(collection, collectionPath, collectionProps);
}
- zkClient.makePath(collectionPath, collectionProps.store(), CreateMode.PERSISTENT, null, true);
+ ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
+ zkClient.makePath(collectionPath, ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
// ping that there is a new collection
- zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+ zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null, true);
} catch (KeeperException e) {
// its okay if the node already exists
if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -658,9 +764,131 @@ public final class ZkController {
}
}
+
+
+ private void getConfName(String collection, String collectionPath,
+ Map<String,String> collectionProps) throws KeeperException,
+ InterruptedException {
+ // check for configName
+ log.info("Looking for collection configName");
+ int retry = 1;
+ for (; retry < 6; retry++) {
+ if (zkClient.exists(collectionPath, true)) {
+ ZkNodeProps cProps = ZkNodeProps.load(zkClient.getData(collectionPath, null, null, true));
+ if (cProps.containsKey(CONFIGNAME_PROP)) {
+ break;
+ }
+ }
+ // if there is only one conf, use that
+ List<String> configNames = zkClient.getChildren(CONFIGS_ZKNODE, null, true);
+ if (configNames.size() == 1) {
+ // no config set named, but there is only 1 - use it
+ log.info("Only one config set found in zk - using it:" + configNames.get(0));
+ collectionProps.put(CONFIGNAME_PROP, configNames.get(0));
+ break;
+ }
+ log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
+ Thread.sleep(2000);
+ }
+ if (retry == 6) {
+ log.error("Could not find configName for collection " + collection);
+ throw new ZooKeeperException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Could not find configName for collection " + collection);
+ }
+ }
public ZkStateReader getZkStateReader() {
return zkStateReader;
}
+
+ private void publishState(CoreDescriptor cd, String shardZkNodeName, String coreName,
+ Map<String,String> props) {
+ CloudDescriptor cloudDesc = cd.getCloudDescriptor();
+
+ if (cloudDesc.getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getCloudState(), shardZkNodeName)) {
+ // publish with no shard id so we are assigned one, and then look for it
+ doPublish(shardZkNodeName, coreName, props, cloudDesc);
+ String shardId;
+ try {
+ shardId = doGetShardIdProcess(coreName, cloudDesc);
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+ }
+ cloudDesc.setShardId(shardId);
+ }
+
+
+ if (!props.containsKey(ZkStateReader.SHARD_ID_PROP) && cloudDesc.getShardId() != null) {
+ props.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
+ }
+
+ doPublish(shardZkNodeName, coreName, props, cloudDesc);
+ }
+
+
+ private void doPublish(String shardZkNodeName, String coreName,
+ Map<String,String> props, CloudDescriptor cloudDesc) {
+
+ CoreState coreState = new CoreState(coreName,
+ cloudDesc.getCollectionName(), props);
+ coreStates.put(shardZkNodeName, coreState);
+ final String nodePath = "/node_states/" + getNodeName();
+
+ try {
+ zkClient.setData(nodePath, ZkStateReader.toJSON(coreStates.values()),
+ true);
+
+ } catch (KeeperException e) {
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "could not publish node state", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "could not publish node state", e);
+ }
+ }
+
+ private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor)
+ throws InterruptedException {
+ final String shardZkNodeName = getNodeName() + "_" + coreName;
+ int retryCount = 120;
+ while (retryCount-- > 0) {
+ final String shardId = zkStateReader.getCloudState().getShardId(
+ shardZkNodeName);
+ if (shardId != null) {
+ return shardId;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Could not get shard_id for core: " + coreName);
+ }
+
+ public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
+ File[] files = dir.listFiles();
+ if (files == null) {
+ throw new IllegalArgumentException("Illegal directory: " + dir);
+ }
+ for(File file : files) {
+ if (!file.getName().startsWith(".")) {
+ if (!file.isDirectory()) {
+ zkClient.makePath(zkPath + "/" + file.getName(), file, false, true);
+ } else {
+ uploadToZK(zkClient, file, zkPath + "/" + file.getName());
+ }
+ }
+ }
+ }
+
+ public static void uploadConfigDir(SolrZkClient zkClient, File dir, String configName) throws IOException, KeeperException, InterruptedException {
+ uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+ }
+
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Wed Jan 25 19:49:26 2012
@@ -74,7 +74,7 @@ public class ZkSolrResourceLoader extend
String file = collectionZkPath + "/" + resource;
try {
if (zkController.pathExists(file)) {
- byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null);
+ byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null, true);
return new ByteArrayInputStream(bytes);
}
} catch (Exception e) {
@@ -105,7 +105,7 @@ public class ZkSolrResourceLoader extend
public String[] listConfigDir() {
List<String> list;
try {
- list = zkController.getZkClient().getChildren(collectionZkPath, null);
+ list = zkController.getZkClient().getChildren(collectionZkPath, null, true);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -119,5 +119,9 @@ public class ZkSolrResourceLoader extend
}
return list.toArray(new String[0]);
}
+
+ public String getCollectionZkPath() {
+ return collectionZkPath;
+ }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java Wed Jan 25 19:49:26 2012
@@ -242,7 +242,10 @@ public class Config {
public String get(String path, String def) {
String val = getVal(path, false);
- return val!=null ? val : def;
+ if (val == null || val.length() == 0) {
+ return def;
+ }
+ return val;
}
public int getInt(String path) {