You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/01/25 20:49:30 UTC

svn commit: r1235888 [2/12] - in /lucene/dev/trunk: dev-tools/eclipse/ dev-tools/maven/ solr/ solr/cloud-dev/ solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/ solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/da...

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,450 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
+import org.apache.solr.cloud.ShardLeaderWatcher.ShardLeaderListener;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.CoreState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkOperation;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cluster leader. Responsible node assignments, cluster state file?
+ */
+public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+  
+  public static final String ASSIGNMENTS_NODE = "/node_assignments";
+  public static final String STATES_NODE = "/node_states";
+  private static Logger log = LoggerFactory.getLogger(Overseer.class);
+  
+  private final SolrZkClient zkClient;
+  private final ZkStateReader reader;
+  
+  // node stateWatches
+  private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
+
+  // shard leader watchers  (collection->slice->watcher)
+  private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
+  private ZkCmdExecutor zkCmdExecutor;
+
+  public Overseer(final SolrZkClient zkClient, final ZkStateReader reader) throws KeeperException, InterruptedException {
+    log.info("Constructing new Overseer");
+    this.zkClient = zkClient;
+    this.zkCmdExecutor = new ZkCmdExecutor();
+    this.reader = reader;
+    createWatches();
+  }
+  
+  public synchronized void createWatches()
+      throws KeeperException, InterruptedException {
+    addCollectionsWatch();
+    addLiveNodesWatch();
+  }
+
+  /* 
+   * Watch for collections so we can add watches for its shard leaders.
+   */
+  private void addCollectionsWatch() throws KeeperException,
+      InterruptedException {
+    
+    zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
+    
+    List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
+      @Override
+      public void process(WatchedEvent event) {
+        try {
+          List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this, true);
+          collectionsChanged(collections);
+        } catch (KeeperException e) {
+            if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
+            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+            return;
+          }
+        } catch (InterruptedException e) {
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+          log.warn("", e);
+        }
+      }
+    }, true);
+    
+    collectionsChanged(collections);
+  }
+  
+  private void collectionsChanged(Collection<String> collections) throws KeeperException, InterruptedException {
+    synchronized (shardLeaderWatches) {
+      for(String collection: collections) {
+        if(!shardLeaderWatches.containsKey(collection)) {
+          shardLeaderWatches.put(collection, new HashMap<String,ShardLeaderWatcher>());
+          addShardLeadersWatch(collection);
+        }
+      }
+      //XXX not handling delete collections..
+    }
+  }
+
+  /**
+   * Add a watch for node containing shard leaders for a collection
+   * @param collection
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private void addShardLeadersWatch(final String collection) throws KeeperException,
+      InterruptedException {
+    
+    zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null), zkClient);
+    
+    final List<String> leaderNodes = zkClient.getChildren(
+        ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
+          
+          @Override
+          public void process(WatchedEvent event) {
+            try {
+              List<String> leaderNodes = zkClient.getChildren(
+                  ZkStateReader.getShardLeadersPath(collection, null), this, true);
+              
+              processLeaderNodesChanged(collection, leaderNodes);
+            } catch (KeeperException e) {
+              if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                return;
+              }
+              SolrException.log(log, "", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+            }
+          }
+        }, true);
+    
+    processLeaderNodesChanged(collection, leaderNodes);
+  }
+
+  /**
+   * Process change in shard leaders. Make sure we have watches for each leader.
+   */
+  private void processLeaderNodesChanged(final String collection, final Collection<String> shardIds) {
+    if(log.isInfoEnabled()) {
+      log.info("Leader nodes changed for collection: " + collection + " nodes now:" + shardIds);
+    }
+    
+    Map<String, ShardLeaderWatcher> watches = shardLeaderWatches.get(collection);
+    Set<String> currentWatches = new HashSet<String>();
+    currentWatches.addAll(watches.keySet());
+    
+    Set<String> newLeaders = complement(shardIds, currentWatches);
+
+    Set<String> lostLeaders = complement(currentWatches, shardIds);
+    //remove watches for lost shards
+    for (String shardId : lostLeaders) {
+      ShardLeaderWatcher watcher = watches.remove(shardId);
+      if (watcher != null) {
+        watcher.close();
+        announceLeader(collection, shardId, new ZkCoreNodeProps(new ZkNodeProps()));  //removes loeader for shard
+      }
+    }
+    
+    //add watches for the new shards
+    for(String shardId: newLeaders) {
+      try {
+        ShardLeaderWatcher watcher = new ShardLeaderWatcher(shardId, collection, zkClient, this);
+        watches.put(shardId, watcher);
+      } catch (KeeperException e) {
+        log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+      }
+    }
+  }
+
+  private void addLiveNodesWatch() throws KeeperException,
+      InterruptedException {
+    List<String> liveNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
+      
+      @Override
+      public Object execute() throws KeeperException, InterruptedException {
+        return zkClient.getChildren(
+            ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
+              
+              @Override
+              public void process(WatchedEvent event) {
+                try {
+                    List<String> liveNodes = zkClient.getChildren(
+                        ZkStateReader.LIVE_NODES_ZKNODE, this, true);
+                    Set<String> liveNodesSet = new HashSet<String>();
+                    liveNodesSet.addAll(liveNodes);
+                    processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
+                } catch (KeeperException e) {
+                  if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                      || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                    log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+                    return;
+                  }
+                  SolrException.log(log, "", e);
+                  throw new ZooKeeperException(
+                      SolrException.ErrorCode.SERVER_ERROR, "", e);
+                } catch (InterruptedException e) {
+                  // Restore the interrupted status
+                  Thread.currentThread().interrupt();
+                }
+              }
+            }, true);
+      }
+    });
+    
+    processLiveNodesChanged(Collections.<String>emptySet(), liveNodes);
+  }
+  
+  private void processLiveNodesChanged(Collection<String> oldLiveNodes,
+      Collection<String> liveNodes) throws InterruptedException, KeeperException {
+    
+    Set<String> upNodes = complement(liveNodes, oldLiveNodes);
+    if (upNodes.size() > 0) {
+      addNodeStateWatches(upNodes);
+    }
+    
+    Set<String> downNodes = complement(oldLiveNodes, liveNodes);
+    for(String node: downNodes) {
+      NodeStateWatcher watcher = nodeStateWatches.remove(node);
+    }
+  }
+  
+  private void addNodeStateWatches(Set<String> nodeNames) throws InterruptedException, KeeperException {
+    
+    for (String nodeName : nodeNames) {
+      final String path = STATES_NODE + "/" + nodeName;
+      synchronized (nodeStateWatches) {
+        if (!nodeStateWatches.containsKey(nodeName)) {
+          zkCmdExecutor.ensureExists(path, zkClient);
+          nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
+        } else {
+          log.debug("watch already added");
+        }
+      }
+    }
+  }
+  
+  /**
+   * Try to assign core to the cluster
+   * @throws KeeperException 
+   * @throws InterruptedException 
+   */
+  private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
+    String collection = coreState.getCollectionName();
+    String zkCoreNodeName = coreState.getCoreNodeName();
+    
+      String shardId;
+      if (coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP) == null) {
+        shardId = AssignShard.assignShard(collection, state);
+      } else {
+        shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
+      }
+      
+      Map<String,String> props = new HashMap<String,String>();
+      for (Entry<String,String> entry : coreState.getProperties().entrySet()) {
+        props.put(entry.getKey(), entry.getValue());
+      }
+      ZkNodeProps zkProps = new ZkNodeProps(props);
+      Slice slice = state.getSlice(collection, shardId);
+      Map<String,ZkNodeProps> shardProps;
+      if (slice == null) {
+        shardProps = new HashMap<String,ZkNodeProps>();
+      } else {
+        shardProps = state.getSlice(collection, shardId).getShardsCopy();
+      }
+      shardProps.put(zkCoreNodeName, zkProps);
+
+      slice = new Slice(shardId, shardProps);
+      CloudState newCloudState = updateSlice(state, collection, slice);
+      return newCloudState;
+  }
+  
+  private Set<String> complement(Collection<String> next,
+      Collection<String> prev) {
+    Set<String> downCollections = new HashSet<String>();
+    downCollections.addAll(next);
+    downCollections.removeAll(prev);
+    return downCollections;
+  }
+
+  @Override
+  public void coreChanged(final String nodeName, final Set<CoreState> states) throws KeeperException, InterruptedException  {
+    log.debug("Cores changed: " + nodeName + " states:" + states);
+    synchronized(reader.getUpdateLock()) {
+      reader.updateCloudState(true);
+      CloudState cloudState = reader.getCloudState();
+      for (CoreState state : states) {
+        cloudState = updateState(cloudState, nodeName, state);
+      }
+
+      try {
+        zkClient.setData(ZkStateReader.CLUSTER_STATE,
+            ZkStateReader.toJSON(cloudState), true);  
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Interrupted while publishing new state", e);
+      }
+    }
+  }
+  
+  public static void createClientNodes(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
+    final String node = STATES_NODE + "/" + nodeName;
+    if (log.isInfoEnabled()) {
+      log.info("creating node:" + node);
+    }
+    
+    ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
+    zkCmdExecutor.ensureExists(node, zkClient);
+  }
+  
+  private CloudState updateSlice(CloudState state, String collection, Slice slice) {
+    
+    final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+    newStates.putAll(state.getCollectionStates());
+    
+    if (!newStates.containsKey(collection)) {
+      newStates.put(collection, new LinkedHashMap<String,Slice>());
+    }
+    
+    final Map<String, Slice> slices = newStates.get(collection);
+    if (!slices.containsKey(slice.getName())) {
+      slices.put(slice.getName(), slice);
+    } else {
+      final Map<String,ZkNodeProps> shards = new LinkedHashMap<String,ZkNodeProps>();
+      final Slice existingSlice = slices.get(slice.getName());
+      shards.putAll(existingSlice.getShards());
+      //XXX preserve existing leader
+      for(Entry<String, ZkNodeProps> edit: slice.getShards().entrySet()) {
+        if(existingSlice.getShards().get(edit.getKey())!=null && existingSlice.getShards().get(edit.getKey()).containsKey(ZkStateReader.LEADER_PROP)) {
+          HashMap<String, String> newProps = new HashMap<String,String>();
+          newProps.putAll(edit.getValue().getProperties());
+          newProps.put(ZkStateReader.LEADER_PROP, existingSlice.getShards().get(edit.getKey()).get(ZkStateReader.LEADER_PROP));
+          shards.put(edit.getKey(), new ZkNodeProps(newProps));
+        } else {
+          shards.put(edit.getKey(), edit.getValue());
+        }
+      }
+      final Slice updatedSlice = new Slice(slice.getName(), shards);
+      slices.put(slice.getName(), updatedSlice);
+    }
+    return new CloudState(state.getLiveNodes(), newStates);
+  }
+
+  private CloudState setShardLeader(CloudState state, String collection, String sliceName, String leaderUrl) {
+    
+    boolean updated = false;
+    final Map<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
+    newStates.putAll(state.getCollectionStates());
+    
+    final Map<String, Slice> slices = newStates.get(collection);
+
+    if(slices==null) {
+      log.error("Could not mark shard leader for non existing collection.");
+      return state;
+    }
+    
+    if (!slices.containsKey(sliceName)) {
+      log.error("Could not mark leader for non existing slice.");
+      return state;
+    } else {
+      final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
+      for(Entry<String, ZkNodeProps> shard: slices.get(sliceName).getShards().entrySet()) {
+        Map<String, String> newShardProps = new LinkedHashMap<String,String>();
+        newShardProps.putAll(shard.getValue().getProperties());
+        
+        String wasLeader = newShardProps.remove(ZkStateReader.LEADER_PROP);  //clean any previously existed flag
+
+        ZkCoreNodeProps zkCoreNodeProps = new ZkCoreNodeProps(new ZkNodeProps(newShardProps));
+        if(leaderUrl!=null && leaderUrl.equals(zkCoreNodeProps.getCoreUrl())) {
+          newShardProps.put(ZkStateReader.LEADER_PROP,"true");
+          if (wasLeader == null) {
+            updated = true;
+          }
+        } else {
+          if (wasLeader != null) {
+            updated = true;
+          }
+        }
+        newShards.put(shard.getKey(), new ZkNodeProps(newShardProps));
+      }
+      Slice slice = new Slice(sliceName, newShards);
+      slices.put(sliceName, slice);
+    }
+    if (updated) {
+      return new CloudState(state.getLiveNodes(), newStates);
+    } else {
+      return state;
+    }
+  }
+
+  @Override
+  public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
+    synchronized (reader.getUpdateLock()) {
+      try {
+        reader.updateCloudState(true); // get fresh copy of the state
+      final CloudState state = reader.getCloudState();
+      final CloudState newState = setShardLeader(state, collection, shardId,
+          props.getCoreUrl());
+        if (state != newState) { // if same instance was returned no need to
+                                 // update state
+          log.info("Announcing new leader: coll: " + collection + " shard: " + shardId + " props:" + props);
+          zkClient.setData(ZkStateReader.CLUSTER_STATE,
+              ZkStateReader.toJSON(newState), true);
+          
+        } else {
+          log.debug("State was not changed.");
+        }
+      } catch (KeeperException e) {
+        log.warn("Could not announce new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.warn("Could not promote new leader coll:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
+      }
+    }
+  }
+  
+}
\ No newline at end of file

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,272 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.update.UpdateLog.RecoveryInfo;
+import org.apache.solr.util.RefCounted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecoveryStrategy extends Thread {
+  private static final int MAX_RETRIES = 100;
+  private static final int INTERRUPTED = 101;
+  private static final int START_TIMEOUT = 100;
+  
+  private static final String REPLICATION_HANDLER = "/replication";
+
+  private static Logger log = LoggerFactory.getLogger(RecoveryStrategy.class);
+
+  private volatile boolean close = false;
+
+  private ZkController zkController;
+  private String baseUrl;
+  private String coreZkNodeName;
+  private ZkStateReader zkStateReader;
+  private volatile String coreName;
+  private int retries;
+  private SolrCore core;
+  
+  public RecoveryStrategy(SolrCore core) {
+    this.core = core;
+    this.coreName = core.getName();
+    
+    zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+    zkStateReader = zkController.getZkStateReader();
+    baseUrl = zkController.getBaseUrl();
+    coreZkNodeName = zkController.getNodeName() + "_" + coreName;
+    
+  }
+  
+  // make sure any threads stop retrying
+  public void close() {
+    close = true;
+    interrupt();
+  }
+
+  
+  private void recoveryFailed(final SolrCore core,
+      final ZkController zkController, final String baseUrl,
+      final String shardZkNodeName, final CoreDescriptor cd) {
+    SolrException.log(log, "Recovery failed - I give up.");
+    zkController.publishAsRecoveryFailed(baseUrl, cd,
+        shardZkNodeName, core.getName());
+    close = true;
+  }
+  
+  private void replicate(String nodeName, SolrCore core, String shardZkNodeName, ZkNodeProps leaderprops, String baseUrl)
+      throws SolrServerException, IOException {
+    // start buffer updates to tran log
+    // and do recovery - either replay via realtime get (eventually)
+    // or full index replication
+   
+    String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
+    ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
+    String leaderUrl = leaderCNodeProps.getCoreUrl();
+    String leaderCoreName = leaderCNodeProps.getCoreName();
+    
+    log.info("Attempt to replicate from " + leaderUrl);
+    
+    // if we are the leader, either we are trying to recover faster
+    // then our ephemeral timed out or we are the only node
+    if (!leaderBaseUrl.equals(baseUrl)) {
+      
+      CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl);
+      server.setSoTimeout(15000);
+      PrepRecovery prepCmd = new PrepRecovery();
+      prepCmd.setCoreName(leaderCoreName);
+      prepCmd.setNodeName(nodeName);
+      prepCmd.setCoreNodeName(shardZkNodeName);
+      
+      server.request(prepCmd);
+      server.shutdown();
+      
+      // use rep handler directly, so we can do this sync rather than async
+      SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
+      if (handler instanceof LazyRequestHandlerWrapper) {
+        handler = ((LazyRequestHandlerWrapper)handler).getWrappedHandler();
+      }
+      ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+      
+      if (replicationHandler == null) {
+        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+            "Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
+      }
+      
+      ModifiableSolrParams solrParams = new ModifiableSolrParams();
+      solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
+      
+      if (close) retries = INTERRUPTED; 
+      boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure fore=true does not download files we already have
+
+      if (!success) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
+      }
+      
+      // solrcloud_debug
+//      try {
+//        RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+//        SolrIndexSearcher searcher = searchHolder.get();
+//        try {
+//          System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replicated "
+//              + searcher.search(new MatchAllDocsQuery(), 1).totalHits + " from " + leaderUrl + " gen:" + core.getDeletionPolicy().getLatestCommit().getGeneration() + " data:" + core.getDataDir());
+//        } finally {
+//          searchHolder.decref();
+//        }
+//      } catch (Exception e) {
+//        
+//      }
+    }
+  }
+  
+  @Override
+  public void run() {
+    boolean replayed = false;
+    boolean succesfulRecovery = false;
+    
+    while (!succesfulRecovery && !close && !isInterrupted()) {
+      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+      if (ulog == null) return;
+      
+      ulog.bufferUpdates();
+      replayed = false;
+      CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+      try {
+        zkController.publish(core, ZkStateReader.RECOVERING);
+        
+        ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
+            cloudDesc.getCollectionName(), cloudDesc.getShardId());
+        
+        // System.out.println("recover " + shardZkNodeName + " against " +
+        // leaderprops);
+        replicate(zkController.getNodeName(), core, coreZkNodeName,
+            leaderprops, ZkCoreNodeProps.getCoreUrl(baseUrl, coreName));
+        
+        replay(ulog);
+        replayed = true;
+        
+        // if there are pending recovery requests, don't advert as active
+        zkController.publishAsActive(baseUrl, core.getCoreDescriptor(), coreZkNodeName,
+            coreName);
+        
+        succesfulRecovery = true;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.warn("Recovery was interrupted", e);
+        retries = INTERRUPTED;
+      } catch (Throwable t) {
+        SolrException.log(log, "Error while trying to recover", t);
+      } finally {
+        if (!replayed) {
+          try {
+            ulog.dropBufferedUpdates();
+          } catch (Throwable t) {
+            SolrException.log(log, "", t);
+          }
+        }
+        
+      }
+      
+      if (!succesfulRecovery) {
+        // lets pause for a moment and we need to try again...
+        // TODO: we don't want to retry for some problems?
+        // Or do a fall off retry...
+        try {
+
+          SolrException.log(log, "Recovery failed - trying again...");
+          retries++;
+          if (retries >= MAX_RETRIES) {
+            if (retries == INTERRUPTED) {
+
+            } else {
+              // TODO: for now, give up after 10 tries - should we do more?
+              recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
+                  core.getCoreDescriptor());
+            }
+            break;
+          }
+          
+        } catch (Exception e) {
+          SolrException.log(log, "", e);
+        }
+        
+        try {
+          Thread.sleep(Math.min(START_TIMEOUT * retries, 60000));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.warn("Recovery was interrupted", e);
+          retries = INTERRUPTED;
+        }
+      }
+      
+      log.info("Finished recovery process");
+      
+    }
+  }
+
+  private Future<RecoveryInfo> replay(UpdateLog ulog)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    Future<RecoveryInfo> future = ulog.applyBufferedUpdates();
+    if (future == null) {
+      // no replay needed\
+      log.info("No replay needed");
+    } else {
+      // wait for replay
+      future.get();
+    }
+    
+    // solrcloud_debug
+//    try {
+//      RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
+//      SolrIndexSearcher searcher = searchHolder.get();
+//      try {
+//        System.out.println(core.getCoreDescriptor().getCoreContainer().getZkController().getNodeName() + " replayed "
+//            + searcher.search(new MatchAllDocsQuery(), 1).totalHits);
+//      } finally {
+//        searchHolder.decref();
+//      }
+//    } catch (Exception e) {
+//      
+//    }
+    
+    return future;
+  }
+
+}

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ShardLeaderWatcher.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,90 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A watcher for shard leader.
+ */
+public class ShardLeaderWatcher implements Watcher {
+  private static Logger logger = LoggerFactory.getLogger(ShardLeaderWatcher.class); 
+  static interface ShardLeaderListener {
+    void announceLeader(String collection, String shardId, ZkCoreNodeProps props);
+  }
+  
+  private final String shard;
+  private final String collection;
+  private final String path;
+  private final SolrZkClient zkClient;
+  private volatile boolean closed = false;
+  private final ShardLeaderListener listener;
+  
+  public ShardLeaderWatcher(String shard, String collection,
+      SolrZkClient zkClient, ShardLeaderListener listener) throws KeeperException, InterruptedException {
+    this.shard = shard;
+    this.collection = collection;
+    this.path = ZkStateReader.getShardLeadersPath(collection, shard);
+    this.zkClient = zkClient;
+    this.listener = listener;
+    processLeaderChange();
+  }
+  
+  private void processLeaderChange() throws KeeperException, InterruptedException {
+    if(closed) return;
+    try {
+      byte[] data = zkClient.getData(path, this, null, true);
+      if (data != null) {
+        final ZkCoreNodeProps leaderProps = new ZkCoreNodeProps(ZkNodeProps.load(data));
+        listener.announceLeader(collection, shard, leaderProps);
+      }
+    } catch (KeeperException ke) {
+      //check if we lost connection or the node was gone
+      if (ke.code() != Code.CONNECTIONLOSS && ke.code() != Code.SESSIONEXPIRED
+          && ke.code() != Code.NONODE) {
+        throw ke;
+      }
+    }
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    try {
+      processLeaderChange();
+    } catch (KeeperException e) {
+      logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      logger.warn("Shard leader watch triggered but Solr cannot talk to zk.");
+    }
+  }
+  
+  public void close() {
+    closed = true;
+  }
+
+}

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1235888&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Wed Jan 25 19:49:26 2012
@@ -0,0 +1,227 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.PeerSync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncStrategy {
+  protected final Logger log = LoggerFactory.getLogger(getClass());
+  
+  public boolean sync(ZkController zkController, SolrCore core,
+      ZkNodeProps leaderProps) {
+    zkController.publish(core, ZkStateReader.SYNC);
+    
+    // solrcloud_debug
+    // System.out.println("SYNC UP");
+    boolean success = syncReplicas(zkController, core, leaderProps);
+    return success;
+  }
+  
+  private boolean syncReplicas(ZkController zkController, SolrCore core,
+      ZkNodeProps leaderProps) {
+    boolean success = false;
+    CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+    String collection = cloudDesc.getCollectionName();
+    String shardId = cloudDesc.getShardId();
+
+    // first sync ourselves - we are the potential leader after all
+    try {
+      success = syncWithReplicas(zkController, core, leaderProps, collection,
+          shardId);
+    } catch (Exception e) {
+      SolrException.log(log, "Sync Failed", e);
+    }
+    try {
+      // if !success but no one else is in active mode,
+      // we are the leader anyway
+      // TODO: should we also be leader if there is only one other active?
+      // if we couldn't sync with it, it shouldn't be able to sync with us
+      if (!success
+          && !areAnyOtherReplicasActive(zkController, leaderProps, collection,
+              shardId)) {
+//        System.out
+//            .println("wasnt a success but no on else i active! I am the leader");
+        
+        success = true;
+      }
+      
+      if (success) {
+        // solrcloud_debug
+        // System.out.println("Sync success");
+        // we are the leader - tell all of our replias to sync with us
+        
+        syncToMe(zkController, collection, shardId, leaderProps);
+        
+      } else {
+        
+        // solrcloud_debug
+        // System.out.println("Sync failure");
+      }
+      
+    } catch (Exception e) {
+      SolrException.log(log, "Sync Failed", e);
+    }
+    
+    return success;
+  }
+  
+  private boolean areAnyOtherReplicasActive(ZkController zkController,
+      ZkNodeProps leaderProps, String collection, String shardId) {
+    CloudState cloudState = zkController.getZkStateReader().getCloudState();
+    Map<String,Slice> slices = cloudState.getSlices(collection);
+    Slice slice = slices.get(shardId);
+    Map<String,ZkNodeProps> shards = slice.getShards();
+    for (Map.Entry<String,ZkNodeProps> shard : shards.entrySet()) {
+      String state = shard.getValue().get(ZkStateReader.STATE_PROP);
+//      System.out.println("state:"
+//          + state
+//          + shard.getValue().get(ZkStateReader.NODE_NAME_PROP)
+//          + " live: "
+//          + cloudState.liveNodesContain(shard.getValue().get(
+//              ZkStateReader.NODE_NAME_PROP)));
+      if ((state.equals(ZkStateReader.ACTIVE))
+          && cloudState.liveNodesContain(shard.getValue().get(
+              ZkStateReader.NODE_NAME_PROP))
+          && !new ZkCoreNodeProps(shard.getValue()).getCoreUrl().equals(
+              new ZkCoreNodeProps(leaderProps).getCoreUrl())) {
+        return true;
+      }
+    }
+    
+    return false;
+  }
+  
+  private boolean syncWithReplicas(ZkController zkController, SolrCore core,
+      ZkNodeProps props, String collection, String shardId)
+      throws MalformedURLException, SolrServerException, IOException {
+    List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
+        .getReplicaProps(collection, shardId,
+            props.get(ZkStateReader.NODE_NAME_PROP),
+            props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE); // TODO:
+    // should
+    // there
+    // be a
+    // state
+    // filter?
+    
+    if (nodes == null) {
+      // I have no replicas
+      return true;
+    }
+    
+    List<String> syncWith = new ArrayList<String>();
+    for (ZkCoreNodeProps node : nodes) {
+      // if we see a leader, must be stale state, and this is the guy that went down
+      if (!node.getNodeProps().keySet().contains(ZkStateReader.LEADER_PROP)) {
+        syncWith.add(node.getCoreUrl());
+      }
+    }
+    
+ 
+    PeerSync peerSync = new PeerSync(core, syncWith, 1000);
+    return peerSync.sync();
+  }
+  
+  private void syncToMe(ZkController zkController, String collection,
+      String shardId, ZkNodeProps leaderProps) throws MalformedURLException,
+      SolrServerException, IOException {
+    
+    // sync everyone else
+    // TODO: we should do this in parallel at least
+    List<ZkCoreNodeProps> nodes = zkController
+        .getZkStateReader()
+        .getReplicaProps(collection, shardId,
+            leaderProps.get(ZkStateReader.NODE_NAME_PROP),
+            leaderProps.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.ACTIVE);
+    if (nodes == null) {
+      // System.out.println("I have no replicas");
+      // I have no replicas
+      return;
+    }
+    //System.out.println("tell my replicas to sync");
+    ZkCoreNodeProps zkLeader = new ZkCoreNodeProps(leaderProps);
+    for (ZkCoreNodeProps node : nodes) {
+      try {
+        // TODO: do we first everyone register as sync phase? get the overseer
+        // to do it?
+        // TODO: this should be done in parallel
+        QueryRequest qr = new QueryRequest(params("qt", "/get", "getVersions",
+            Integer.toString(1000), "sync", zkLeader.getCoreUrl(), "distrib",
+            "false"));
+        CommonsHttpSolrServer server = new CommonsHttpSolrServer(
+            node.getCoreUrl());
+        //System.out.println("ask " + node.getCoreUrl() + " to sync");
+        NamedList rsp = server.request(qr);
+        //System.out.println("response about syncing to leader:" + rsp + " node:"
+        //    + node.getCoreUrl() + " me:" + zkController.getBaseUrl());
+        boolean success = (Boolean) rsp.get("sync");
+        //System.out.println("success:" + success);
+        if (!success) {
+         // System.out
+         //     .println("try and ask " + node.getCoreUrl() + " to recover");
+          log.info("try and ask " + node.getCoreUrl() + " to recover");
+          try {
+            server = new CommonsHttpSolrServer(node.getBaseUrl());
+            server.setSoTimeout(5000);
+            server.setConnectionTimeout(5000);
+            
+            RequestRecovery recoverRequestCmd = new RequestRecovery();
+            recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
+            recoverRequestCmd.setCoreName(node.getCoreName());
+            
+            server.request(recoverRequestCmd);
+          } catch (Exception e) {
+            log.info("Could not tell a replica to recover", e);
+          }
+        }
+      } catch (Exception e) {
+        SolrException.log(log, "Error syncing replica to leader", e);
+      }
+    }
+  }
+  
+  public static ModifiableSolrParams params(String... params) {
+    ModifiableSolrParams msp = new ModifiableSolrParams();
+    for (int i = 0; i < params.length; i += 2) {
+      msp.add(params[i], params[i + 1]);
+    }
+    return msp;
+  }
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Jan 25 19:49:26 2012
@@ -20,24 +20,36 @@ package org.apache.solr.cloud;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.CloudState;
+import org.apache.solr.common.cloud.CoreState;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.UpdateLog;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +73,7 @@ public final class ZkController {
   private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
   private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
 
-
+  
   // package private for tests
 
   static final String CONFIGS_ZKNODE = "/configs";
@@ -69,10 +81,14 @@ public final class ZkController {
   public final static String COLLECTION_PARAM_PREFIX="collection.";
   public final static String CONFIGNAME_PROP="configName";
 
-  private SolrZkClient zkClient;
+  private final Map<String, CoreState> coreStates = Collections.synchronizedMap(new HashMap<String, CoreState>());
   
+  private SolrZkClient zkClient;
+  private ZkCmdExecutor cmdExecutor;
   private ZkStateReader zkStateReader;
 
+  private LeaderElector leaderElector;
+  
   private String zkServerAddress;
 
   private String localHostPort;
@@ -82,20 +98,61 @@ public final class ZkController {
 
   private String hostName;
 
+  private LeaderElector overseerElector;
+  
+  private boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
+
+  // this can be null in which case recovery will be inactive
+  private CoreContainer cc;
+
+  public static void main(String[] args) throws Exception {
+    // start up a tmp zk server first
+    String zkServerAddress = args[0];
+    
+    String solrHome = args[1];
+    String solrPort = args[2];
+    
+    String confDir = args[3];
+    String confName = args[4];
+    
+    SolrZkServer zkServer = new SolrZkServer("true", null, solrHome, solrPort);
+    zkServer.parseConfig();
+    zkServer.start();
+    
+    SolrZkClient zkClient = new SolrZkClient(zkServerAddress, 15000, 5000,
+        new OnReconnect() {
+          @Override
+          public void command() {
+          }});
+    
+    uploadConfigDir(zkClient, new File(confDir), confName);
+    
+    zkServer.stop();
+  }
+
+
   /**
-   * @param zkServerAddress ZooKeeper server host address
+   * @param coreContainer if null, recovery will not be enabled
+   * @param zkServerAddress
    * @param zkClientTimeout
    * @param zkClientConnectTimeout
    * @param localHost
    * @param locaHostPort
    * @param localHostContext
+   * @param numShards 
    * @throws InterruptedException
    * @throws TimeoutException
    * @throws IOException
    */
-  public ZkController(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
-      String localHostContext) throws InterruptedException,
+  public ZkController(CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
+      String localHostContext, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
       TimeoutException, IOException {
+    this.cc = cc;
+    if (localHostContext.contains("/")) {
+      throw new IllegalArgumentException("localHostContext ("
+          + localHostContext + ") should not contain a /");
+    }
+    
     this.zkServerAddress = zkServerAddress;
     this.localHostPort = locaHostPort;
     this.localHostContext = localHostContext;
@@ -107,68 +164,61 @@ public final class ZkController {
 
           public void command() {
             try {
-              zkStateReader.makeCollectionsNodeWatches();
-              zkStateReader.makeShardsWatches(true);
+              // we need to create all of our lost watches
+              
+              // seems we dont need to do this again...
+              //Overseer.createClientNodes(zkClient, getNodeName());
+
+              ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+              overseerElector.joinElection(context);
+              zkStateReader.createClusterStateWatchersAndUpdate();
+              
+              List<CoreDescriptor> descriptors = registerOnReconnect
+                  .getCurrentDescriptors();
+              if (descriptors != null) {
+                // before registering as live, make sure everyone is in a
+                // recovery state
+                for (CoreDescriptor descriptor : descriptors) {
+                  final String shardZkNodeName = getNodeName() + "_"
+                      + descriptor.getName();
+                  publishAsDown(getBaseUrl(), descriptor, shardZkNodeName,
+                      descriptor.getName());
+                }
+              }
+              
+              // we have to register as live first to pick up docs in the buffer
               createEphemeralLiveNode();
-              zkStateReader.updateCloudState(false);
-            } catch (KeeperException e) {
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
+              
+              // re register all descriptors
+              if (descriptors != null) {
+                for (CoreDescriptor descriptor : descriptors) {
+                  // TODO: we need to think carefully about what happens when it was
+                  // a leader that was expired - as well as what to do about leaders/overseers
+                  // with connection loss
+                  register(descriptor.getName(), descriptor, true);
+                }
+              }
+  
             } catch (InterruptedException e) {
               // Restore the interrupted status
               Thread.currentThread().interrupt();
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
-            } catch (IOException e) {
-              log.error("", e);
-              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-                  "", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (Exception e) {
+              SolrException.log(log, "", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
             }
 
           }
         });
+    cmdExecutor = new ZkCmdExecutor();
+    leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient);
     init();
   }
 
   /**
-   * @param shardId
-   * @param collection
-   * @throws IOException
-   * @throws InterruptedException 
-   * @throws KeeperException 
-   */
-  private void addZkShardsNode(String shardId, String collection) throws IOException, InterruptedException, KeeperException {
-
-    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + shardId;
-    
-    try {
-      
-      // shards node
-      if (!zkClient.exists(shardsZkPath)) {
-        if (log.isInfoEnabled()) {
-          log.info("creating zk shards node:" + shardsZkPath);
-        }
-        // makes shards zkNode if it doesn't exist
-        zkClient.makePath(shardsZkPath, CreateMode.PERSISTENT, null);
-        
-        // TODO: consider how these notifications are being done
-        // ping that there is a new shardId
-        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-
-      }
-    } catch (KeeperException e) {
-      // its okay if another beats us creating the node
-      if (e.code() != KeeperException.Code.NODEEXISTS) {
-        throw e;
-      }
-    }
-
-  }
-
-  /**
    * Closes the underlying ZooKeeper client.
    */
   public void close() {
@@ -177,7 +227,7 @@ public final class ZkController {
     } catch (InterruptedException e) {
       // Restore the interrupted status
       Thread.currentThread().interrupt();
-      log.error("", e);
+      log.warn("", e);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
           "", e);
     }
@@ -192,7 +242,7 @@ public final class ZkController {
    */
   public boolean configFileExists(String collection, String fileName)
       throws KeeperException, InterruptedException {
-    Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null);
+    Stat stat = zkClient.exists(CONFIGS_ZKNODE + "/" + collection + "/" + fileName, null, true);
     return stat != null;
   }
 
@@ -213,7 +263,7 @@ public final class ZkController {
   public byte[] getConfigFileData(String zkConfigName, String fileName)
       throws KeeperException, InterruptedException {
     String zkPath = CONFIGS_ZKNODE + "/" + zkConfigName + "/" + fileName;
-    byte[] bytes = zkClient.getData(zkPath, null, null);
+    byte[] bytes = zkClient.getData(zkPath, null, null, true);
     if (bytes == null) {
       log.error("Config file contains no data:" + zkPath);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -271,20 +321,17 @@ public final class ZkController {
       }
       
       // makes nodes zkNode
-      try {
-        zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
-      } catch (KeeperException e) {
-        // its okay if another beats us creating the node
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        }
-      }
+      cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
       
+      Overseer.createClientNodes(zkClient, getNodeName());
       createEphemeralLiveNode();
-      setUpCollectionsNode();
-      zkStateReader.makeCollectionsNodeWatches();
+      cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
+
+      overseerElector = new LeaderElector(zkClient);
+      ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+      overseerElector.setup(context);
+      overseerElector.joinElection(context);
+      zkStateReader.createClusterStateWatchersAndUpdate();
       
     } catch (IOException e) {
       log.error("", e);
@@ -303,53 +350,17 @@ public final class ZkController {
     }
 
   }
+  
+  public boolean isConnected() {
+    return zkClient.isConnected();
+  }
 
   private void createEphemeralLiveNode() throws KeeperException,
       InterruptedException {
     String nodeName = getNodeName();
     String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:" + nodePath);
-    Watcher liveNodeWatcher = new Watcher() {
-
-      public void process(WatchedEvent event) {
-        try {
-          log.info("Updating live nodes:" + zkClient);
-          try {
-            zkStateReader.updateLiveNodes();
-          } finally {
-            // re-make watch
-
-            String path = event.getPath();
-            if(path == null) {
-              // on shutdown, it appears this can trigger with a null path
-              log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-              return;
-            }
-            zkClient.getChildren(event.getPath(), this);
-          }
-        } catch (KeeperException e) {
-          if(e.code() == KeeperException.Code.SESSIONEXPIRED || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-            return;
-          }
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (InterruptedException e) {
-          // Restore the interrupted status
-          Thread.currentThread().interrupt();
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        } catch (IOException e) {
-          log.error("", e);
-          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-              "", e);
-        }
-        
-      }
-      
-    };
+   
     try {
       boolean nodeDeleted = true;
       try {
@@ -358,7 +369,7 @@ public final class ZkController {
         // until expiration timeout - so a node won't be created here because
         // it exists, but eventually the node will be removed. So delete
         // in case it exists and create a new node.
-        zkClient.delete(nodePath, -1);
+        zkClient.delete(nodePath, -1, true);
       } catch (KeeperException.NoNodeException e) {
         // fine if there is nothing to delete
         // TODO: annoying that ZK logs a warning on us
@@ -369,25 +380,17 @@ public final class ZkController {
             .info("Found a previous node that still exists while trying to register a new live node "
                 + nodePath + " - removing existing node to create another.");
       }
-      zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
+      zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
     } catch (KeeperException e) {
       // its okay if the node already exists
       if (e.code() != KeeperException.Code.NODEEXISTS) {
         throw e;
       }
-    }
-    zkClient.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, liveNodeWatcher);
-    try {
-      zkStateReader.updateLiveNodes();
-    } catch (IOException e) {
-      log.error("", e);
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-          "", e);
-    }
+    }    
   }
   
   public String getNodeName() {
-    return hostName + ":" + localHostPort + "_"+ localHostContext;
+    return hostName + ":" + localHostPort + "_" + localHostContext;
   }
 
   /**
@@ -398,7 +401,7 @@ public final class ZkController {
    */
   public boolean pathExists(String path) throws KeeperException,
       InterruptedException {
-    return zkClient.exists(path);
+    return zkClient.exists(path, true);
   }
 
   /**
@@ -417,15 +420,14 @@ public final class ZkController {
     if (log.isInfoEnabled()) {
       log.info("Load collection config from:" + path);
     }
-    byte[] data = zkClient.getData(path, null, null);
-    ZkNodeProps props = new ZkNodeProps();
+    byte[] data = zkClient.getData(path, null, null, true);
     
     if(data != null) {
-      props.load(data);
+      ZkNodeProps props = ZkNodeProps.load(data);
       configName = props.get(CONFIGNAME_PROP);
     }
     
-    if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName)) {
+    if (configName != null && !zkClient.exists(CONFIGS_ZKNODE + "/" + configName, true)) {
       log.error("Specified config does not exist in ZooKeeper:" + configName);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
           "Specified config does not exist in ZooKeeper:" + configName);
@@ -434,67 +436,224 @@ public final class ZkController {
     return configName;
   }
 
+
   /**
    * Register shard with ZooKeeper.
    * 
    * @param coreName
    * @param cloudDesc
-   * @param forcePropsUpdate update solr.xml core props even if the shard is already registered
-   * @throws IOException
-   * @throws KeeperException
-   * @throws InterruptedException
+   * @return
+   * @throws Exception 
+   */
+  public String register(String coreName, final CoreDescriptor desc) throws Exception {  
+    return register(coreName, desc, false);
+  }
+  
+
+  /**
+   * Register shard with ZooKeeper.
+   * 
+   * @param coreName
+   * @param desc
+   * @param recoverReloadedCores
+   * @return
+   * @throws Exception
    */
-  public void register(String coreName, CloudDescriptor cloudDesc, boolean forcePropsUpdate) throws IOException,
-      KeeperException, InterruptedException {
-    String shardUrl = localHostName + ":" + localHostPort + "/" + localHostContext
-        + "/" + coreName;
+  public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores) throws Exception {  
+    final String baseUrl = getBaseUrl();
     
-    String collection = cloudDesc.getCollectionName();
+    final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+    final String collection = cloudDesc.getCollectionName();
     
-    String shardsZkPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + ZkStateReader.SHARDS_ZKNODE + "/" + cloudDesc.getShardId();
 
-    boolean shardZkNodeAlreadyExists = zkClient.exists(shardsZkPath);
-    
-    if(shardZkNodeAlreadyExists && !forcePropsUpdate) {
-      return;
-    }
+    log.info("Attempting to update " + ZkStateReader.CLUSTER_STATE + " version "
+        + null);
+    CloudState state = CloudState.load(zkClient, zkStateReader.getCloudState().getLiveNodes());
+
+    final String coreZkNodeName = getNodeName() + "_" + coreName;
     
+    String shardId = cloudDesc.getShardId();
+
+    Map<String,String> props = new HashMap<String,String>();
+    props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+    props.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
+    props.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+
     if (log.isInfoEnabled()) {
-      log.info("Register shard - core:" + coreName + " address:"
-          + shardUrl);
+        log.info("Register shard - core:" + coreName + " address:"
+            + baseUrl + " shardId:" + shardId);
     }
 
-    ZkNodeProps props = new ZkNodeProps();
-    props.put(ZkStateReader.URL_PROP, shardUrl);
+    // we only put a subset of props into the leader node
+    ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
+        props.get(ZkStateReader.BASE_URL_PROP), ZkStateReader.CORE_NAME_PROP,
+        props.get(ZkStateReader.CORE_NAME_PROP), ZkStateReader.NODE_NAME_PROP,
+        props.get(ZkStateReader.NODE_NAME_PROP));
     
-    props.put(ZkStateReader.NODE_NAME, getNodeName());
 
-    byte[] bytes = props.store();
+    joinElection(collection, coreZkNodeName, shardId, leaderProps);
+    
+    String leaderUrl = zkStateReader.getLeaderUrl(collection,
+        cloudDesc.getShardId(), 30000);
+    
+    String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+    log.info("We are " + ourUrl + " and leader is " + leaderUrl);
+    boolean isLeader = leaderUrl.equals(ourUrl);
     
-    String shardZkNodeName = getNodeName() + "_" + coreName;
 
-    if(shardZkNodeAlreadyExists && forcePropsUpdate) {
-      zkClient.setData(shardsZkPath + "/" + shardZkNodeName, bytes);
-      // tell everyone to update cloud info
-      zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-    } else {
-      addZkShardsNode(cloudDesc.getShardId(), collection);
+    SolrCore core = null;
+    if (cc != null) { // CoreContainer only null in tests
       try {
-        zkClient.create(shardsZkPath + "/" + shardZkNodeName, bytes,
-            CreateMode.PERSISTENT);
-        // tell everyone to update cloud info
-        zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
-      } catch (KeeperException e) {
-        // its okay if the node already exists
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
-          throw e;
+        core = cc.getCore(desc.getName());
+
+        boolean startRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+            collection, coreZkNodeName, shardId, leaderProps, core, cc);
+        if (!startRecovery) {
+          publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+        }
+      } finally {
+        if (core != null) {
+          core.close();
+        }
+      }
+    } else {
+      publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+    }
+    
+    // make sure we have an update cluster state right away
+    zkStateReader.updateCloudState(true);
+
+    return shardId;
+  }
+
+
+  private void joinElection(final String collection,
+      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps) throws InterruptedException, KeeperException, IOException {
+    ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
+        collection, shardZkNodeName, leaderProps, this, cc);
+    
+    leaderElector.setup(context);
+    leaderElector.joinElection(context);
+  }
+
+
+  private boolean checkRecovery(String coreName, final CoreDescriptor desc,
+      boolean recoverReloadedCores, final boolean isLeader,
+      final CloudDescriptor cloudDesc, final String collection,
+      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
+      SolrCore core, CoreContainer cc) throws InterruptedException,
+      KeeperException, IOException, ExecutionException {
+
+    
+    boolean doRecovery = true;
+
+
+    if (isLeader) {
+      doRecovery = false;
+      
+      // recover from local transaction log and wait for it to complete before
+      // going active
+      // TODO: should this be moved to another thread? To recoveryStrat?
+      // TODO: should this actually be done earlier, before (or as part of)
+      // leader election perhaps?
+      // TODO: ensure that a replica that is trying to recover waits until I'm
+      // active (or don't make me the
+      // leader until my local replay is done. But this replay is only needed
+      // on the leader - replicas
+      // will do recovery anyway
+      
+      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+      if (!core.isReloaded() && ulog != null) {
+        Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+            .getUpdateLog().recoverFromLog();
+        if (recoveryFuture != null) {
+          recoveryFuture.get(); // NOTE: this could potentially block for
+                                // minutes or more!
+          // TODO: public as recovering in the mean time?
         }
-        // for some reason the shard already exists, though it didn't when we
-        // started registration - just return
-        return;
       }
+      return false;
+    } else {
+      
+      if (core.isReloaded() && !recoverReloadedCores) {
+        doRecovery = false;
+      }
+    }
+    
+    if (doRecovery && !SKIP_AUTO_RECOVERY) {
+      log.info("Core needs to recover:" + core.getName());
+      core.getUpdateHandler().getSolrCoreState().doRecovery(core);
+      return true;
     }
+    
+    return false;
+  }
+
+
+  public String getBaseUrl() {
+    final String baseUrl = localHostName + ":" + localHostPort + "/" + localHostContext;
+    return baseUrl;
+  }
+
 
+  void publishAsActive(String shardUrl,
+      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, shardUrl);
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
+
+    publishState(cd, shardZkNodeName, coreName, finalProps);
+  }
+
+  public void publish(SolrCore core, String state) {
+    CoreDescriptor cd = core.getCoreDescriptor();
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, core.getName());
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, state);
+    publishState(cd, getNodeName() + "_" + core.getName(),
+        core.getName(), finalProps);
+  }
+  
+  void publishAsDown(String baseUrl,
+      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
+ 
+    publishState(cd, shardZkNodeName, coreName, finalProps);
+  }
+  
+  void publishAsRecoveryFailed(String baseUrl,
+      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
+    Map<String,String> finalProps = new HashMap<String,String>();
+    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
+    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
+    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERY_FAILED);
+    publishState(cd, shardZkNodeName, coreName, finalProps);
+  }
+
+
+  private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
+      final CloudState state, final String shardZkNodeName) {
+
+    final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+    
+    final String shardId = state.getShardId(shardZkNodeName);
+
+    if (shardId != null) {
+      cloudDesc.setShardId(shardId);
+      return false;
+    }
+    return true;
   }
 
   /**
@@ -513,16 +672,7 @@ public final class ZkController {
    * @throws InterruptedException
    */
   public void uploadToZK(File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
-    File[] files = dir.listFiles();
-    for(File file : files) {
-      if (!file.getName().startsWith(".")) {
-        if (!file.isDirectory()) {
-          zkClient.setData(zkPath + "/" + file.getName(), file);
-        } else {
-          uploadToZK(file, zkPath + "/" + file.getName());
-        }
-      }
-    }
+    uploadToZK(zkClient, dir, zkPath);
   }
   
   /**
@@ -533,7 +683,7 @@ public final class ZkController {
    * @throws InterruptedException
    */
   public void uploadConfigDir(File dir, String configName) throws IOException, KeeperException, InterruptedException {
-    uploadToZK(dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+    uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
   }
 
   // convenience for testing
@@ -541,32 +691,6 @@ public final class ZkController {
     zkClient.printLayoutToStdOut();
   }
 
-  private void setUpCollectionsNode() throws KeeperException, InterruptedException {
-    try {
-      if (!zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE)) {
-        if (log.isInfoEnabled()) {
-          log.info("creating zk collections node:" + ZkStateReader.COLLECTIONS_ZKNODE);
-        }
-        // makes collections zkNode if it doesn't exist
-        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE, CreateMode.PERSISTENT, null);
-      }
-    } catch (KeeperException e) {
-      // its okay if another beats us creating the node
-      if (e.code() != KeeperException.Code.NODEEXISTS) {
-        log.error("", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "", e);
-      }
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-      log.error("", e);
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-          "", e);
-    }
-    
-  }
-
   public void createCollectionZkNode(CloudDescriptor cd) throws KeeperException, InterruptedException, IOException {
     String collection = cd.getCollectionName();
     
@@ -574,12 +698,12 @@ public final class ZkController {
     String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
     
     try {
-      if(!zkClient.exists(collectionPath)) {
+      if(!zkClient.exists(collectionPath, true)) {
         log.info("Creating collection in ZooKeeper:" + collection);
        SolrParams params = cd.getParams();
 
         try {
-          ZkNodeProps collectionProps = new ZkNodeProps();
+          Map<String,String> collectionProps = new HashMap<String,String>();
           // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
           String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, "configuration1");
 
@@ -595,7 +719,7 @@ public final class ZkController {
 
             // if the config name wasn't passed in, use the default
             if (!collectionProps.containsKey(CONFIGNAME_PROP))
-              collectionProps.put(CONFIGNAME_PROP,  defaultConfigName);
+              getConfName(collection, collectionPath, collectionProps);
             
           } else if(System.getProperty("bootstrap_confdir") != null) {
             // if we are bootstrapping a collection, default the config for
@@ -614,32 +738,14 @@ public final class ZkController {
               collectionProps.put(CONFIGNAME_PROP,  defaultConfigName);
 
           } else {
-            // check for configName
-            log.info("Looking for collection configName");
-            int retry = 1;
-            for (; retry < 6; retry++) {
-              if (zkClient.exists(collectionPath)) {
-                collectionProps = new ZkNodeProps();
-                collectionProps.load(zkClient.getData(collectionPath, null, null));
-                if (collectionProps.containsKey(CONFIGNAME_PROP)) {
-                  break;
-                }
-              }
-              log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
-              Thread.sleep(2000);
-            }
-            if (retry == 6) {
-              log.error("Could not find configName for collection " + collection);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR,
-                  "Could not find configName for collection " + collection);
-            }
+            getConfName(collection, collectionPath, collectionProps);
           }
           
-          zkClient.makePath(collectionPath, collectionProps.store(), CreateMode.PERSISTENT, null, true);
+          ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
+          zkClient.makePath(collectionPath, ZkStateReader.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
          
           // ping that there is a new collection
-          zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null);
+          zkClient.setData(ZkStateReader.COLLECTIONS_ZKNODE, (byte[])null, true);
         } catch (KeeperException e) {
           // its okay if the node already exists
           if (e.code() != KeeperException.Code.NODEEXISTS) {
@@ -658,9 +764,131 @@ public final class ZkController {
     }
     
   }
+
+
+  private void getConfName(String collection, String collectionPath,
+      Map<String,String> collectionProps) throws KeeperException,
+      InterruptedException {
+    // check for configName
+    log.info("Looking for collection configName");
+    int retry = 1;
+    for (; retry < 6; retry++) {
+      if (zkClient.exists(collectionPath, true)) {
+        ZkNodeProps cProps = ZkNodeProps.load(zkClient.getData(collectionPath, null, null, true));
+        if (cProps.containsKey(CONFIGNAME_PROP)) {
+          break;
+        }
+      }
+      // if there is only one conf, use that
+      List<String> configNames = zkClient.getChildren(CONFIGS_ZKNODE, null, true);
+      if (configNames.size() == 1) {
+        // no config set named, but there is only 1 - use it
+        log.info("Only one config set found in zk - using it:" + configNames.get(0));
+        collectionProps.put(CONFIGNAME_PROP,  configNames.get(0));
+        break;
+      }
+      log.info("Could not find collection configName - pausing for 2 seconds and trying again - try: " + retry);
+      Thread.sleep(2000);
+    }
+    if (retry == 6) {
+      log.error("Could not find configName for collection " + collection);
+      throw new ZooKeeperException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Could not find configName for collection " + collection);
+    }
+  }
   
   public ZkStateReader getZkStateReader() {
     return zkStateReader;
   }
 
+  
+  private void publishState(CoreDescriptor cd, String shardZkNodeName, String coreName,
+      Map<String,String> props) {
+    CloudDescriptor cloudDesc = cd.getCloudDescriptor();
+    
+    if (cloudDesc.getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getCloudState(), shardZkNodeName)) {
+      // publish with no shard id so we are assigned one, and then look for it
+      doPublish(shardZkNodeName, coreName, props, cloudDesc);
+      String shardId;
+      try {
+        shardId = doGetShardIdProcess(coreName, cloudDesc);
+      } catch (InterruptedException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+      }
+      cloudDesc.setShardId(shardId);
+    }
+   
+    
+    if (!props.containsKey(ZkStateReader.SHARD_ID_PROP) && cloudDesc.getShardId() != null) {
+      props.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
+    }
+    
+    doPublish(shardZkNodeName, coreName, props, cloudDesc);
+  }
+
+
+  private void doPublish(String shardZkNodeName, String coreName,
+      Map<String,String> props, CloudDescriptor cloudDesc) {
+
+    CoreState coreState = new CoreState(coreName,
+        cloudDesc.getCollectionName(), props);
+    coreStates.put(shardZkNodeName, coreState);
+    final String nodePath = "/node_states/" + getNodeName();
+
+    try {
+      zkClient.setData(nodePath, ZkStateReader.toJSON(coreStates.values()),
+          true);
+      
+    } catch (KeeperException e) {
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "could not publish node state", e);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+          "could not publish node state", e);
+    }
+  }
+
+  private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor)
+      throws InterruptedException {
+    final String shardZkNodeName = getNodeName() + "_" + coreName;
+    int retryCount = 120;
+    while (retryCount-- > 0) {
+      final String shardId = zkStateReader.getCloudState().getShardId(
+          shardZkNodeName);
+      if (shardId != null) {
+        return shardId;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    throw new SolrException(ErrorCode.SERVER_ERROR,
+        "Could not get shard_id for core: " + coreName);
+  }
+  
+  public static void uploadToZK(SolrZkClient zkClient, File dir, String zkPath) throws IOException, KeeperException, InterruptedException {
+    File[] files = dir.listFiles();
+    if (files == null) {
+      throw new IllegalArgumentException("Illegal directory: " + dir);
+    }
+    for(File file : files) {
+      if (!file.getName().startsWith(".")) {
+        if (!file.isDirectory()) {
+          zkClient.makePath(zkPath + "/" + file.getName(), file, false, true);
+        } else {
+          uploadToZK(zkClient, file, zkPath + "/" + file.getName());
+        }
+      }
+    }
+  }
+  
+  public static void uploadConfigDir(SolrZkClient zkClient, File dir, String configName) throws IOException, KeeperException, InterruptedException {
+    uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
+  }
+
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkSolrResourceLoader.java Wed Jan 25 19:49:26 2012
@@ -74,7 +74,7 @@ public class ZkSolrResourceLoader extend
     String file = collectionZkPath + "/" + resource;
     try {
       if (zkController.pathExists(file)) {
-        byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null);
+        byte[] bytes = zkController.getZkClient().getData(collectionZkPath + "/" + resource, null, null, true);
         return new ByteArrayInputStream(bytes);
       }
     } catch (Exception e) {
@@ -105,7 +105,7 @@ public class ZkSolrResourceLoader extend
   public String[] listConfigDir() {
     List<String> list;
     try {
-      list = zkController.getZkClient().getChildren(collectionZkPath, null);
+      list = zkController.getZkClient().getChildren(collectionZkPath, null, true);
     } catch (InterruptedException e) {
       // Restore the interrupted status
       Thread.currentThread().interrupt();
@@ -119,5 +119,9 @@ public class ZkSolrResourceLoader extend
     }
     return list.toArray(new String[0]);
   }
+
+  public String getCollectionZkPath() {
+    return collectionZkPath;
+  }
   
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java?rev=1235888&r1=1235887&r2=1235888&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/Config.java Wed Jan 25 19:49:26 2012
@@ -242,7 +242,10 @@ public class Config {
 
   public String get(String path, String def) {
     String val = getVal(path, false);
-    return val!=null ? val : def;
+    if (val == null || val.length() == 0) {
+      return def;
+    }
+    return val;
   }
 
   public int getInt(String path) {