You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by si...@apache.org on 2012/06/08 07:21:29 UTC

svn commit: r1347880 - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/core/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: siren
Date: Fri Jun  8 05:21:28 2012
New Revision: 1347880

URL: http://svn.apache.org/viewvc?rev=1347880&view=rev
Log:
SOLR-3511 refactor Overseer to use a queue

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java   (with props)
Removed:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/NodeStateWatcher.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/NodeStateWatcherTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CoreState.java
Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/SolrLogFormatter.java Fri Jun  8 05:21:28 2012
@@ -19,13 +19,14 @@ package org.apache.solr;
 
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrRequestInfo;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.WeakHashMap;
@@ -94,15 +95,11 @@ public class SolrLogFormatter extends Fo
     methodAlias.put(new Method("org.apache.solr.update.processor.LogUpdateProcessor","finish"), "");
   }
 
-
-
-
   public static class CoreInfo {
-    public static int maxCoreNum;
-    public String shortId;
-    public String url;
-    CoreState coreState;  // should be fine to keep a hard reference to this
-    // CloudState cloudState;  // should be fine to keep this hard reference since cloudstate is immutable and doesn't have pointers to anything heavyweight (like SolrCore, CoreContainer, etc)
+    static int maxCoreNum;
+    String shortId;
+    String url;
+    Map<String, String> coreProps;
   }
 
   Map<SolrCore, CoreInfo> coreInfoMap = new WeakHashMap<SolrCore, CoreInfo>();    // TODO: use something that survives across a core reload?
@@ -199,11 +196,15 @@ sb.append("(group_name=").append(tg.getN
           sb.append(" url="+info.url + " node="+zkController.getNodeName());
         }
 
-        // look to see if local core state changed
-        CoreState coreState = zkController.getCoreState(core.getName());
-        if (coreState != info.coreState) {
-          sb.append(" " + info.shortId + "_STATE=" + coreState);
-          info.coreState = coreState;
+        if(info.coreProps == null) {
+          info.coreProps = getCoreProps(zkController, core);
+        }
+
+        Map<String, String> coreProps = getCoreProps(zkController, core);
+        if(!coreProps.equals(info.coreProps)) {
+          info.coreProps = coreProps;
+          final String corePropsString = "coll:" + core.getCoreDescriptor().getCloudDescriptor().getCollectionName() + " core:" + core.getName() + " props:" + coreProps;
+          sb.append(" " + info.shortId + "_STATE=" + corePropsString);
         }
       }
     }
@@ -260,6 +261,16 @@ sb.append("(group_name=").append(tg.getN
     return sb.toString();
   }
 
+  private Map<String,String> getCoreProps(ZkController zkController, SolrCore core) {
+    final String collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
+    ZkNodeProps props = zkController.getCloudState().getShardProps(collection,  ZkStateReader.getCoreNodeName(zkController.getNodeName(), core.getName()));
+    if(props!=null) {
+      return props.getProperties(); 
+    }
+    return Collections.EMPTY_MAP;
+  }
+
+
   private Method classAndMethod = new Method(null,null); // don't need to be thread safe
   private String getShortClassName(String name, String method) {
     classAndMethod.className = name;

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1347880&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Fri Jun  8 05:21:28 2012
@@ -0,0 +1,284 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A distributed queue from zk recipes.
+ */
+public class DistributedQueue {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DistributedQueue.class);
+  
+  private final String dir;
+  
+  private ZooKeeper zookeeper;
+  private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+  
+  private final String prefix = "qn-";
+  
+  public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+    this.dir = dir;
+    
+    if (acl != null) {
+      this.acl = acl;
+    }
+    this.zookeeper = zookeeper;
+    
+  }
+  
+  /**
+   * Returns a Map of the children, ordered by id.
+   * 
+   * @param watcher
+   *          optional watcher on getChildren() operation.
+   * @return map from id to child name for all children
+   */
+  private TreeMap<Long,String> orderedChildren(Watcher watcher)
+      throws KeeperException, InterruptedException {
+    TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();
+    
+    List<String> childNames = null;
+    try {
+      childNames = zookeeper.getChildren(dir, watcher);
+    } catch (KeeperException.NoNodeException e) {
+      throw e;
+    }
+    
+    for (String childName : childNames) {
+      try {
+        // Check format
+        if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
+          LOG.warn("Found child node with improper name: " + childName);
+          continue;
+        }
+        String suffix = childName.substring(prefix.length());
+        Long childId = new Long(suffix);
+        orderedChildren.put(childId, childName);
+      } catch (NumberFormatException e) {
+        LOG.warn("Found child node with improper format : " + childName + " "
+            + e, e);
+      }
+    }
+    
+    return orderedChildren;
+  }
+  
+  /**
+   * Return the head of the queue without modifying the queue.
+   * 
+   * @return the data at the head of the queue.
+   * @throws NoSuchElementException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] element() throws NoSuchElementException, KeeperException,
+      InterruptedException {
+    TreeMap<Long,String> orderedChildren;
+    
+    // element, take, and remove follow the same pattern.
+    // We want to return the child node with the smallest sequence number.
+    // Since other clients are remove()ing and take()ing nodes concurrently,
+    // the child with the smallest sequence number in orderedChildren might be
+    // gone by the time we check.
+    // We don't call getChildren again until we have tried the rest of the nodes
+    // in sequence order.
+    while (true) {
+      try {
+        orderedChildren = orderedChildren(null);
+      } catch (KeeperException.NoNodeException e) {
+        throw new NoSuchElementException();
+      }
+      if (orderedChildren.size() == 0) throw new NoSuchElementException();
+      
+      for (String headNode : orderedChildren.values()) {
+        if (headNode != null) {
+          try {
+            return zookeeper.getData(dir + "/" + headNode, false, null);
+          } catch (KeeperException.NoNodeException e) {
+            // Another client removed the node first, try next
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Attempts to remove the head of the queue and return it.
+   * 
+   * @return The former head of the queue
+   * @throws NoSuchElementException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] remove() throws NoSuchElementException, KeeperException,
+      InterruptedException {
+    TreeMap<Long,String> orderedChildren;
+    // Same as for element. Should refactor this.
+    while (true) {
+      try {
+        orderedChildren = orderedChildren(null);
+      } catch (KeeperException.NoNodeException e) {
+        throw new NoSuchElementException();
+      }
+      if (orderedChildren.size() == 0) throw new NoSuchElementException();
+      
+      for (String headNode : orderedChildren.values()) {
+        String path = dir + "/" + headNode;
+        try {
+          byte[] data = zookeeper.getData(path, false, null);
+          zookeeper.delete(path, -1);
+          return data;
+        } catch (KeeperException.NoNodeException e) {
+          // Another client deleted the node first.
+        }
+      }
+      
+    }
+  }
+  
+  private class LatchChildWatcher implements Watcher {
+    
+    CountDownLatch latch;
+    
+    public LatchChildWatcher() {
+      latch = new CountDownLatch(1);
+    }
+    
+    public void process(WatchedEvent event) {
+      LOG.debug("Watcher fired on path: " + event.getPath() + " state: "
+          + event.getState() + " type " + event.getType());
+      latch.countDown();
+    }
+    
+    public void await() throws InterruptedException {
+      latch.await();
+    }
+  }
+  
+  /**
+   * Removes the head of the queue and returns it, blocks until it succeeds.
+   * 
+   * @return The former head of the queue
+   * @throws NoSuchElementException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] take() throws KeeperException, InterruptedException {
+    TreeMap<Long,String> orderedChildren;
+    // Same as for element. Should refactor this.
+    while (true) {
+      LatchChildWatcher childWatcher = new LatchChildWatcher();
+      try {
+        orderedChildren = orderedChildren(childWatcher);
+      } catch (KeeperException.NoNodeException e) {
+        zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+        continue;
+      }
+      if (orderedChildren.size() == 0) {
+        childWatcher.await();
+        continue;
+      }
+      
+      for (String headNode : orderedChildren.values()) {
+        String path = dir + "/" + headNode;
+        try {
+          byte[] data = zookeeper.getData(path, false, null);
+          zookeeper.delete(path, -1);
+          return data;
+        } catch (KeeperException.NoNodeException e) {
+          // Another client deleted the node first.
+        }
+      }
+    }
+  }
+  
+  /**
+   * Inserts data into queue.
+   * 
+   * @param data
+   * @return true if data was successfully added
+   */
+  public boolean offer(byte[] data) throws KeeperException,
+      InterruptedException {
+    for (;;) {
+      try {
+        zookeeper.create(dir + "/" + prefix, data, acl,
+            CreateMode.PERSISTENT_SEQUENTIAL);
+        return true;
+      } catch (KeeperException.NoNodeException e) {
+        try {
+          zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+        } catch (KeeperException.NodeExistsException ne) {
+        //someone created it
+        }
+      }
+    }
+
+    
+    
+  }
+  
+  /**
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty.
+   * 
+   * @return data at the first element of the queue, or null.
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] peek() throws KeeperException, InterruptedException {
+    try {
+      return element();
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
+  
+  /**
+   * Attempts to remove the head of the queue and return it. Returns null if the
+   * queue is empty.
+   * 
+   * @return Head of the queue or null.
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public byte[] poll() throws KeeperException, InterruptedException {
+    try {
+      return remove();
+    } catch (NoSuchElementException e) {
+      return null;
+    }
+  }
+  
+}

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Jun  8 05:21:28 2012
@@ -8,7 +8,6 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.CloudState;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkClientConnectionStrategy;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -96,6 +95,13 @@ class ShardLeaderElectionContextBase ext
           leaderProps == null ? null : ZkStateReader.toJSON(leaderProps),
           CreateMode.EPHEMERAL, true);
     }
+    
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+        "leader", ZkStateReader.SHARD_ID_PROP, shardId,
+        ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.BASE_URL_PROP,
+        leaderProps.getProperties().get(ZkStateReader.BASE_URL_PROP),
+        ZkStateReader.CORE_NAME_PROP, leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP));
+    Overseer.getInQueue(zkClient).offer(ZkStateReader.toJSON(m));
   } 
 
 }
@@ -240,10 +246,10 @@ final class OverseerElectionContext exte
   private final SolrZkClient zkClient;
   private final ZkStateReader stateReader;
 
-  public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, ZkStateReader stateReader) {
+  public OverseerElectionContext(final String zkNodeName, ZkStateReader stateReader) {
     super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
-    this.zkClient = zkClient;
     this.stateReader = stateReader;
+    this.zkClient = stateReader.getZkClient();
   }
 
   @Override
@@ -265,7 +271,7 @@ final class OverseerElectionContext exte
           CreateMode.EPHEMERAL, true);
     }
   
-    new Overseer(zkClient, stateReader, id);
+    new Overseer(stateReader, id);
   }
   
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Jun  8 05:21:28 2012
@@ -17,157 +17,169 @@ package org.apache.solr.cloud;
  * the License.
  */
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.Set;
 
-import org.apache.solr.cloud.NodeStateWatcher.NodeStateChangeListener;
-import org.apache.solr.cloud.ShardLeaderWatcher.ShardLeaderListener;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkOperation;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Cluster leader. Responsible node assignments, cluster state file?
  */
-public class Overseer implements NodeStateChangeListener, ShardLeaderListener {
+public class Overseer {
+  public static final String QUEUE_OPERATION = "operation";
 
   private static final int STATE_UPDATE_DELAY = 500;  // delay between cloud state updates
 
-  static enum Op {
-    LeaderChange, StateChange, CoreDeleted; 
-  }
-
-  private final class CloudStateUpdateRequest {
-    
-    final Op operation;
-    final Object[] args;
-    
-     CloudStateUpdateRequest(final Op operation, final Object... args) {
-       this.operation = operation;
-       this.args = args;
-    }
-  }
-  
-  public static final String STATES_NODE = "/node_states";
   private static Logger log = LoggerFactory.getLogger(Overseer.class);
   
-  private final SolrZkClient zkClient;
-  
-  // pooled updates
-  private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo = new LinkedBlockingQueue<CloudStateUpdateRequest>();
-  
-  // node stateWatches
-  private HashMap<String,NodeStateWatcher> nodeStateWatches = new HashMap<String,NodeStateWatcher>();
-
-  // shard leader watchers  (collection->slice->watcher)
-  private HashMap<String, HashMap<String,ShardLeaderWatcher>> shardLeaderWatches = new HashMap<String,HashMap<String,ShardLeaderWatcher>>();
-  private ZkCmdExecutor zkCmdExecutor;
-
   private static class CloudStateUpdater implements Runnable {
     
-    private final LinkedBlockingQueue<CloudStateUpdateRequest> fifo;
     private final ZkStateReader reader;
     private final SolrZkClient zkClient;
     private final String myId;
-    
-    public CloudStateUpdater(final LinkedBlockingQueue<CloudStateUpdateRequest> fifo, final ZkStateReader reader, final SolrZkClient zkClient, final String myId) {
-      this.fifo = fifo;
+    //queue where everybody can throw tasks
+    private final DistributedQueue stateUpdateQueue; 
+    //Internal queue where overseer stores events that have not yet been published into cloudstate
+    //If Overseer dies while extracting the main queue a new overseer will start from this queue 
+    private final DistributedQueue workQueue;
+    
+    public CloudStateUpdater(final ZkStateReader reader, final String myId) {
+      this.zkClient = reader.getZkClient();
+      this.stateUpdateQueue = getInQueue(zkClient);
+      this.workQueue = getInternalQueue(zkClient);
       this.myId = myId;
       this.reader = reader;
-      this.zkClient = zkClient;
     }
-      @Override
-      public void run() {
-        while (amILeader()) {
-          
-          
-          LinkedList<CloudStateUpdateRequest> requests = new LinkedList<Overseer.CloudStateUpdateRequest>();
-          while (!fifo.isEmpty()) {
-            // collect all queued requests
-            CloudStateUpdateRequest req;
-            req = fifo.poll();
-            if (req == null) {
-              break;
-            }
-            requests.add(req);
-          }
-
-          if (requests.size() > 0) {
-            // process updates
-            synchronized (reader.getUpdateLock()) {
-              try {
+    
+    @Override
+    public void run() {
+        
+      if(amILeader()) {
+        // see if there's something left from the previous Overseer and re
+        // process all events that were not persisted into cloud state
+          synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
+            try {
+              byte[] head = workQueue.peek();
+              
+              if (head != null) {
                 reader.updateCloudState(true);
                 CloudState cloudState = reader.getCloudState();
-                for (CloudStateUpdateRequest request : requests) {
-
-                  switch (request.operation) {
-                  case LeaderChange:
-                    cloudState = setShardLeader(cloudState,
-                        (String) request.args[0], (String) request.args[1],
-                        (String) request.args[2]);
-
-                    break;
-                  case StateChange:
-                    cloudState = updateState(cloudState,
-                        (String) request.args[0], (CoreState) request.args[1]);
-                    break;
-
-                  case CoreDeleted:
-                    cloudState = removeCore(cloudState, (String) request.args[0], (String) request.args[1]);
-                    break;
-                  }
+                log.info("Replaying operations from work queue.");
+                
+                while (head != null) {
+                  final ZkNodeProps message = ZkNodeProps.load(head);
+                  final String operation = message
+                      .get(QUEUE_OPERATION);
+                  cloudState = processMessage(cloudState, message, operation);
+                  zkClient.setData(ZkStateReader.CLUSTER_STATE,
+                      ZkStateReader.toJSON(cloudState), true);
+                  workQueue.remove();
+                  head = workQueue.peek();
                 }
-
-                log.info("Announcing new cluster state");
-                zkClient.setData(ZkStateReader.CLUSTER_STATE,
-                    ZkStateReader.toJSON(cloudState), true);
-
-              } catch (KeeperException e) {
-                if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                    || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                  log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                  return;
-                }
-                SolrException.log(log, "", e);
-                throw new ZooKeeperException(
-                    SolrException.ErrorCode.SERVER_ERROR, "", e);
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+              }
+            } catch (KeeperException e) {
+              if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+                log.warn("Solr cannot talk to ZK");
                 return;
               }
+              SolrException.log(log, "", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              return;
             }
           }
-
+        }
+      
+      log.info("Starting to work on the main queue");
+      while (amILeader()) {
+        synchronized (reader.getUpdateLock()) {
           try {
-            Thread.sleep(STATE_UPDATE_DELAY);
+            byte[] head = stateUpdateQueue.peek();
+            
+            if (head != null) {
+              reader.updateCloudState(true);
+              CloudState cloudState = reader.getCloudState();
+              
+              while (head != null) {
+                final ZkNodeProps message = ZkNodeProps.load(head);
+                final String operation = message.get(QUEUE_OPERATION);
+                
+                cloudState = processMessage(cloudState, message, operation);
+                byte[] processed = stateUpdateQueue.remove();
+                workQueue.offer(processed);
+                head = stateUpdateQueue.peek();
+              }
+              zkClient.setData(ZkStateReader.CLUSTER_STATE,
+                  ZkStateReader.toJSON(cloudState), true);
+            }
+            // clean work queue
+            while (workQueue.poll() != null);
+            
+          } catch (KeeperException e) {
+            if (e.code() == KeeperException.Code.SESSIONEXPIRED
+                || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+              log.warn("Overseer cannot talk to ZK");
+              return;
+            }
+            SolrException.log(log, "", e);
+            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                "", e);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
+            return;
           }
         }
+        
+        try {
+          Thread.sleep(STATE_UPDATE_DELAY);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    private CloudState processMessage(CloudState cloudState,
+        final ZkNodeProps message, final String operation)
+        throws KeeperException, InterruptedException {
+      if ("state".equals(operation)) {
+        cloudState = updateState(cloudState, message);
+      } else if ("deletecore".equals(operation)) {
+        cloudState = removeCore(cloudState, message);
+      } else if (ZkStateReader.LEADER_PROP.equals(operation)) {
+        StringBuilder sb = new StringBuilder();
+        String baseUrl = message.get(ZkStateReader.BASE_URL_PROP);
+        String coreName = message.get(ZkStateReader.CORE_NAME_PROP);
+        sb.append(baseUrl);
+        if (!baseUrl.endsWith("/")) sb.append("/");
+        sb.append(coreName == null ? "" : coreName);
+        if (!(sb.substring(sb.length() - 1).equals("/"))) sb
+            .append("/");
+        cloudState = setShardLeader(cloudState,
+            message.get(ZkStateReader.COLLECTION_PROP),
+            message.get(ZkStateReader.SHARD_ID_PROP), sb.toString());
+      } else {
+        throw new RuntimeException("unknown operation:" + operation
+            + " contents:" + message.getProperties());
       }
+      return cloudState;
+    }
       
       private boolean amILeader() {
         try {
@@ -188,32 +200,35 @@ public class Overseer implements NodeSta
        * @throws KeeperException 
        * @throws InterruptedException 
        */
-      private CloudState updateState(CloudState state, String nodeName, CoreState coreState) throws KeeperException, InterruptedException {
-        String collection = coreState.getCollectionName();
-        String zkCoreNodeName = coreState.getCoreNodeName();
+      private CloudState updateState(CloudState state, final ZkNodeProps message) throws KeeperException, InterruptedException {
+        final String collection = message.get(ZkStateReader.COLLECTION_PROP);
+        final String zkCoreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
+        final Integer numShards = message.get(ZkStateReader.NUM_SHARDS_PROP)!=null?Integer.parseInt(message.get(ZkStateReader.NUM_SHARDS_PROP)):null;
         
         //collection does not yet exist, create placeholders if num shards is specified
-        if (!state.getCollections().contains(coreState.getCollectionName())
-            && coreState.getNumShards() != null) {
-          state = createCollection(state, collection, coreState.getNumShards());
+        if (!state.getCollections().contains(collection)
+            && numShards!=null) {
+          state = createCollection(state, collection, numShards);
         }
         
         // use the provided non null shardId
-        String shardId = coreState.getProperties().get(ZkStateReader.SHARD_ID_PROP);
-        if(shardId==null) {
-          //use shardId from CloudState
-          shardId = getAssignedId(state, nodeName, coreState);
+        String shardId = message.get(ZkStateReader.SHARD_ID_PROP);
+        if (shardId == null) {
+          String nodeName = message.get(ZkStateReader.NODE_NAME_PROP);
+          //get shardId from CloudState
+          shardId = getAssignedId(state, nodeName, message);
         }
-        if(shardId==null) {
+        if(shardId == null) {
           //request new shardId 
-          shardId = AssignShard.assignShard(collection, state, coreState.getNumShards());
+          shardId = AssignShard.assignShard(collection, state, numShards);
         }
           
           Map<String,String> props = new HashMap<String,String>();
-          Map<String,String> coreProps = new HashMap<String,String>(coreState.getProperties().size());
-          coreProps.putAll(coreState.getProperties());
+          Map<String,String> coreProps = new HashMap<String,String>(message.getProperties().size());
+          coreProps.putAll(message.getProperties());
           // we don't put num_shards in the clusterstate
-          coreProps.remove("num_shards");
+          coreProps.remove(ZkStateReader.NUM_SHARDS_PROP);
+          coreProps.remove(QUEUE_OPERATION);
           for (Entry<String,String> entry : coreProps.entrySet()) {
             props.put(entry.getKey(), entry.getValue());
           }
@@ -249,9 +264,9 @@ public class Overseer implements NodeSta
        * Return an already assigned id or null if not assigned
        */
       private String getAssignedId(final CloudState state, final String nodeName,
-          final CoreState coreState) {
-        final String key = coreState.getProperties().get(ZkStateReader.NODE_NAME_PROP) + "_" +  coreState.getProperties().get(ZkStateReader.CORE_NAME_PROP);
-        Map<String, Slice> slices = state.getSlices(coreState.getCollectionName());
+          final ZkNodeProps coreState) {
+        final String key = coreState.get(ZkStateReader.NODE_NAME_PROP) + "_" +  coreState.get(ZkStateReader.CORE_NAME_PROP);
+        Map<String, Slice> slices = state.getSlices(coreState.get(ZkStateReader.COLLECTION_PROP));
         if (slices != null) {
           for (Slice slice : slices.values()) {
             if (slice.getShards().get(key) != null) {
@@ -303,12 +318,12 @@ public class Overseer implements NodeSta
         final Map<String, Slice> slices = newStates.get(collection);
 
         if(slices==null) {
-          log.error("Could not mark shard leader for non existing collection.");
+          log.error("Could not mark shard leader for non existing collection:" + collection);
           return state;
         }
         
         if (!slices.containsKey(sliceName)) {
-          log.error("Could not mark leader for non existing slice.");
+          log.error("Could not mark leader for non existing slice:" + sliceName);
           return state;
         } else {
           final Map<String,ZkNodeProps> newShards = new LinkedHashMap<String,ZkNodeProps>();
@@ -333,7 +348,11 @@ public class Overseer implements NodeSta
       /*
        * Remove core from cloudstate
        */
-      private CloudState removeCore(final CloudState cloudState, final String collection, final String coreNodeName) {
+      private CloudState removeCore(final CloudState cloudState, ZkNodeProps message) {
+        
+        final String coreNodeName = message.get(ZkStateReader.NODE_NAME_PROP) + "_" + message.get(ZkStateReader.CORE_NAME_PROP);
+        final String collection = message.get(ZkStateReader.COLLECTION_PROP);
+
         final LinkedHashMap<String, Map<String, Slice>> newStates = new LinkedHashMap<String,Map<String,Slice>>();
         for(String collectionName: cloudState.getCollections()) {
           if(collection.equals(collectionName)) {
@@ -360,255 +379,42 @@ public class Overseer implements NodeSta
      }
   }
   
-  public Overseer(final SolrZkClient zkClient, final ZkStateReader reader, String id) throws KeeperException, InterruptedException {
-    log.info("Constructing new Overseer id=" + id);
-    this.zkClient = zkClient;
-    this.zkCmdExecutor = new ZkCmdExecutor();
-    createWatches();
-    
+  public Overseer(final ZkStateReader reader, final String id) throws KeeperException, InterruptedException {
+    log.info("Overseer (id=" + id + ") starting");
     //launch cluster state updater thread
-    ThreadGroup tg = new ThreadGroup("Overseer delayed state updater");
-    Thread updaterThread = new Thread(tg, new CloudStateUpdater(fifo, reader, zkClient, id));
+    ThreadGroup tg = new ThreadGroup("Overseer state updater.");
+    Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
     updaterThread.setDaemon(true);
     updaterThread.start();
   }
-  
-  public synchronized void createWatches()
-      throws KeeperException, InterruptedException {
-    addCollectionsWatch();
-    addLiveNodesWatch();
-  }
-
-  /* 
-   * Watch for collections so we can add watches for its shard leaders.
-   */
-  private void addCollectionsWatch() throws KeeperException,
-      InterruptedException {
-    
-    zkCmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
-    
-    List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, new Watcher(){
-      @Override
-      public void process(WatchedEvent event) {
-        try {
-          List<String> collections = zkClient.getChildren(ZkStateReader.COLLECTIONS_ZKNODE, this, true);
-          collectionsChanged(collections);
-        } catch (KeeperException e) {
-            if (e.code() == Code.CONNECTIONLOSS || e.code() == Code.SESSIONEXPIRED) {
-            log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-            return;
-          }
-        } catch (InterruptedException e) {
-          // Restore the interrupted status
-          Thread.currentThread().interrupt();
-          log.warn("", e);
-        }
-      }
-    }, true);
-    
-    collectionsChanged(collections);
-  }
-  
-  private void collectionsChanged(Collection<String> collections) throws KeeperException, InterruptedException {
-    synchronized (shardLeaderWatches) {
-      for(String collection: collections) {
-        if(!shardLeaderWatches.containsKey(collection)) {
-          shardLeaderWatches.put(collection, new HashMap<String,ShardLeaderWatcher>());
-          addShardLeadersWatch(collection);
-        }
-      }
-      //XXX not handling delete collections..
-    }
-  }
 
   /**
-   * Add a watch for node containing shard leaders for a collection
-   * @param collection
-   * @throws KeeperException
-   * @throws InterruptedException
+   * Get queue that can be used to send messages to Overseer.
    */
-  private void addShardLeadersWatch(final String collection) throws KeeperException,
-      InterruptedException {
-    
-    zkCmdExecutor.ensureExists(ZkStateReader.getShardLeadersPath(collection, null), zkClient);
-    
-    final List<String> leaderNodes = zkClient.getChildren(
-        ZkStateReader.getShardLeadersPath(collection, null), new Watcher() {
-          
-          @Override
-          public void process(WatchedEvent event) {
-            try {
-              List<String> leaderNodes = zkClient.getChildren(
-                  ZkStateReader.getShardLeadersPath(collection, null), this, true);
-              
-              processLeaderNodesChanged(collection, leaderNodes);
-            } catch (KeeperException e) {
-              if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                  || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                return;
-              }
-              SolrException.log(log, "", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-            }
-          }
-        }, true);
-    
-    processLeaderNodesChanged(collection, leaderNodes);
-  }
-
-  /**
-   * Process change in shard leaders. Make sure we have watches for each leader.
-   */
-  private void processLeaderNodesChanged(final String collection, final Collection<String> shardIds) {
-    if(log.isInfoEnabled()) {
-      log.info("Leader nodes changed for collection: " + collection + " nodes now:" + shardIds);
-    }
-    
-    Map<String, ShardLeaderWatcher> watches = shardLeaderWatches.get(collection);
-    Set<String> currentWatches = new HashSet<String>();
-    currentWatches.addAll(watches.keySet());
-    
-    Set<String> newLeaders = complement(shardIds, currentWatches);
-
-    Set<String> lostLeaders = complement(currentWatches, shardIds);
-    //remove watches for lost shards
-    for (String shardId : lostLeaders) {
-      ShardLeaderWatcher watcher = watches.remove(shardId);
-      if (watcher != null) {
-        watcher.close();
-      }
-    }
-    
-    //add watches for the new shards
-    for(String shardId: newLeaders) {
-      try {
-        ShardLeaderWatcher watcher = new ShardLeaderWatcher(shardId, collection, zkClient, this);
-        watches.put(shardId, watcher);
-      } catch (KeeperException e) {
-        log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.error("Failed to create watcher for shard leader col:" + collection + " shard:" + shardId + ", exception: " + e.getClass());
-      }
+  public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue", null);
+  }
+
+  /* Internal queue, not to be used outside of Overseer */
+  static DistributedQueue getInternalQueue(final SolrZkClient zkClient) {
+    createOverseerNode(zkClient);
+    return new DistributedQueue(zkClient.getSolrZooKeeper(), "/overseer/queue-work", null);
+  }
+  
+  private static void createOverseerNode(final SolrZkClient zkClient) {
+    try {
+      zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
+    } catch (KeeperException.NodeExistsException e) {
+      //ok
+    } catch (InterruptedException e) {
+      log.error("Could not create Overseer node: " + e.getClass() + ":" + e.getMessage());
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (KeeperException e) {
+      log.error("Could not create Overseer node: " + e.getClass() + ":" + e.getMessage());
+      throw new RuntimeException(e);
     }
   }
 
-  private void addLiveNodesWatch() throws KeeperException,
-      InterruptedException {
-    List<String> liveNodes = zkCmdExecutor.retryOperation(new ZkOperation() {
-      
-      @Override
-      public Object execute() throws KeeperException, InterruptedException {
-        return zkClient.getChildren(
-            ZkStateReader.LIVE_NODES_ZKNODE, new Watcher() {
-              
-              @Override
-              public void process(WatchedEvent event) {
-                try {
-                    List<String> liveNodes = zkClient.getChildren(
-                        ZkStateReader.LIVE_NODES_ZKNODE, this, true);
-                    synchronized (nodeStateWatches) {
-                      processLiveNodesChanged(nodeStateWatches.keySet(), liveNodes);
-                    }
-                } catch (KeeperException e) {
-                  if (e.code() == KeeperException.Code.SESSIONEXPIRED
-                      || e.code() == KeeperException.Code.CONNECTIONLOSS) {
-                    log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
-                    return;
-                  }
-                  SolrException.log(log, "", e);
-                  throw new ZooKeeperException(
-                      SolrException.ErrorCode.SERVER_ERROR, "", e);
-                } catch (InterruptedException e) {
-                  // Restore the interrupted status
-                  Thread.currentThread().interrupt();
-                }
-              }
-            }, true);
-      }
-    });
-    
-    processLiveNodesChanged(Collections.<String>emptySet(), liveNodes);
-  }
-  
-  private void processLiveNodesChanged(Collection<String> oldLiveNodes,
-      Collection<String> liveNodes) throws InterruptedException, KeeperException {
-    
-    Set<String> upNodes = complement(liveNodes, oldLiveNodes);
-    if (upNodes.size() > 0) {
-      addNodeStateWatches(upNodes);
-    }
-    
-    Set<String> downNodes = complement(oldLiveNodes, liveNodes);
-    for(String node: downNodes) {
-      synchronized (nodeStateWatches) {
-        NodeStateWatcher watcher = nodeStateWatches.remove(node);
-      }
-      log.debug("Removed NodeStateWatcher for node:" + node);
-    }
-  }
-  
-  private void addNodeStateWatches(Set<String> nodeNames) throws InterruptedException, KeeperException {
-    
-    for (String nodeName : nodeNames) {
-      final String path = STATES_NODE + "/" + nodeName;
-      synchronized (nodeStateWatches) {
-        if (!nodeStateWatches.containsKey(nodeName)) {
-          zkCmdExecutor.ensureExists(path, zkClient);
-          nodeStateWatches.put(nodeName, new NodeStateWatcher(zkClient, nodeName, path, this));
-          log.debug("Added NodeStateWatcher for node " + nodeName);
-        } else {
-          log.debug("watch already added");
-        }
-      }
-    }
-  }
-  
-  private Set<String> complement(Collection<String> next,
-      Collection<String> prev) {
-    Set<String> downCollections = new HashSet<String>();
-    downCollections.addAll(next);
-    downCollections.removeAll(prev);
-    return downCollections;
-  }
-
-  @Override
-  public void coreChanged(final String nodeName, final Set<CoreState> states)
-      throws KeeperException, InterruptedException {
-    log.info("Core change pooled: " + nodeName + " states:" + states);
-    for (CoreState state : states) {
-      fifo.add(new CloudStateUpdateRequest(Op.StateChange, nodeName, state));
-    }
-  }
-
-  @Override
-  public void coreDeleted(String nodeName, Collection<CoreState> states)
-      throws KeeperException, InterruptedException {
-    for (CoreState state : states) {
-      fifo.add(new CloudStateUpdateRequest(Op.CoreDeleted, state.getCollectionName(), state.getCoreNodeName()));
-    }
-  }
-
-  public static void createClientNodes(SolrZkClient zkClient, String nodeName) throws KeeperException, InterruptedException {
-    final String node = STATES_NODE + "/" + nodeName;
-    if (log.isInfoEnabled()) {
-      log.info("creating node:" + node);
-    }
-    
-    ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor();
-    zkCmdExecutor.ensureExists(node, zkClient);
-  }
-
-  @Override
-  public void announceLeader(String collection, String shardId, ZkCoreNodeProps props) {
-    String coreUrl = props.getCoreUrl();
-    log.info("Leader change pooled: " + coreUrl);
-    fifo.add(new CloudStateUpdateRequest(Op.LeaderChange, collection, shardId, coreUrl));
-  }
-  
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java Fri Jun  8 05:21:28 2012
@@ -36,6 +36,7 @@ import org.apache.solr.common.cloud.Safe
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
@@ -52,6 +53,7 @@ import org.apache.solr.update.PeerSync;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,11 +100,10 @@ public class RecoveryStrategy extends Th
   
   private void recoveryFailed(final SolrCore core,
       final ZkController zkController, final String baseUrl,
-      final String shardZkNodeName, final CoreDescriptor cd) {
+      final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
     SolrException.log(log, "Recovery failed - I give up.");
     try {
-      zkController.publishAsRecoveryFailed(baseUrl, cd,
-          shardZkNodeName, core.getName());
+      zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
     } finally {
       close();
     }
@@ -208,7 +209,18 @@ public class RecoveryStrategy extends Th
 
       log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
 
-      doRecovery(core);
+      try {
+        doRecovery(core);
+      } catch (KeeperException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        SolrException.log(log, "", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
+            e);
+      }
     } finally {
       if (core != null) core.close();
       SolrRequestInfo.clearRequestInfo();
@@ -216,7 +228,7 @@ public class RecoveryStrategy extends Th
   }
 
   // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
-  public void doRecovery(SolrCore core) {
+  public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
     boolean replayed = false;
     boolean successfulRecovery = false;
 
@@ -327,8 +339,8 @@ public class RecoveryStrategy extends Th
             // }
 
             // sync success - register as active and return
-            zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
-                coreZkNodeName, coreName);
+            zkController.publish(core.getCoreDescriptor(),
+                ZkStateReader.ACTIVE);
             successfulRecovery = true;
             close = true;
             return;
@@ -352,8 +364,7 @@ public class RecoveryStrategy extends Th
 
           log.info("Recovery was successful - registering as Active");
           // if there are pending recovery requests, don't advert as active
-          zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
-              coreZkNodeName, coreName);
+          zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
           close = true;
           successfulRecovery = true;
         } catch (InterruptedException e) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Jun  8 05:21:28 2012
@@ -39,7 +39,6 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -85,6 +84,7 @@ public final class ZkController {
   private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
 
   private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
+  private final DistributedQueue overseerStatusQueue;
   
   // package private for tests
 
@@ -93,11 +93,6 @@ public final class ZkController {
   public final static String COLLECTION_PARAM_PREFIX="collection.";
   public final static String CONFIGNAME_PROP="configName";
 
-  private Map<String, CoreState> coreStates = new HashMap<String, CoreState>();   // key is the local core name
-  private long coreStatesVersion; // bumped by 1 each time we serialize coreStates... sync on  coreStates
-  private long coreStatesPublishedVersion; // last version published to ZK... sync on coreStatesPublishLock
-  private Object coreStatesPublishLock = new Object(); // only publish one at a time
-
   private final Map<String, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
   
   private SolrZkClient zkClient;
@@ -210,7 +205,7 @@ public final class ZkController {
               // seems we dont need to do this again...
               //Overseer.createClientNodes(zkClient, getNodeName());
 
-              ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+              ElectionContext context = new OverseerElectionContext(getNodeName(), zkStateReader);
               overseerElector.joinElection(context);
               zkStateReader.createClusterStateWatchersAndUpdate();
               
@@ -223,8 +218,7 @@ public final class ZkController {
                   final String coreZkNodeName = getNodeName() + "_"
                       + descriptor.getName();
                   try {
-                    publishAsDown(getBaseUrl(), descriptor, coreZkNodeName,
-                        descriptor.getName());
+                    publish(descriptor, ZkStateReader.DOWN);
                     waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
                   } catch (Exception e) {
                     SolrException.log(log, "", e);
@@ -261,6 +255,7 @@ public final class ZkController {
 
  
         });
+    this.overseerStatusQueue = Overseer.getInQueue(zkClient);
     cmdExecutor = new ZkCmdExecutor();
     leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient);
@@ -302,13 +297,6 @@ public final class ZkController {
     return zkStateReader.getCloudState();
   }
 
-  /** @return the CoreState for the core, which may not yet be visible to ZooKeeper or other nodes in the cluster */
-  public CoreState getCoreState(String coreName) {
-    synchronized (coreStates) {
-      return coreStates.get(coreName);
-    }
-  }
-
   /**
    * @param zkConfigName
    * @param fileName
@@ -387,14 +375,11 @@ public final class ZkController {
       // makes nodes zkNode
       cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
       
-      Overseer.createClientNodes(zkClient, getNodeName());
       createEphemeralLiveNode();
       cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
 
-      syncNodeState();
-
       overseerElector = new LeaderElector(zkClient);
-      ElectionContext context = new OverseerElectionContext(getNodeName(), zkClient, zkStateReader);
+      ElectionContext context = new OverseerElectionContext(getNodeName(), zkStateReader);
       overseerElector.setup(context);
       overseerElector.joinElection(context);
       zkStateReader.createClusterStateWatchersAndUpdate();
@@ -416,27 +401,6 @@ public final class ZkController {
     }
 
   }
-  
-  /*
-   * sync internal state with zk on startup
-   */
-  private void syncNodeState() throws KeeperException, InterruptedException {
-    log.debug("Syncing internal state with zk. Current: " + coreStates);
-    final String path = Overseer.STATES_NODE + "/" + getNodeName();
-
-    final byte[] data = zkClient.getData(path, null, null, true);
-
-    if (data != null) {
-      CoreState[] states = CoreState.load(data);
-      synchronized (coreStates) {
-        coreStates.clear();    // TODO: should we do this?
-        for(CoreState coreState: states) {
-          coreStates.put(coreState.getCoreName(), coreState);
-        }
-      }
-    }
-    log.debug("after sync: " + coreStates);
-  }
 
   public boolean isConnected() {
     return zkClient.isConnected();
@@ -640,7 +604,7 @@ public final class ZkController {
         boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
             collection, coreZkNodeName, shardId, leaderProps, core, cc);
         if (!didRecovery) {
-          publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+          publish(desc, ZkStateReader.ACTIVE);
         }
       } finally {
         if (core != null) {
@@ -648,7 +612,7 @@ public final class ZkController {
         }
       }
     } else {
-      publishAsActive(baseUrl, desc, coreZkNodeName, coreName);
+      publish(desc, ZkStateReader.ACTIVE);
     }
     
     // make sure we have an update cluster state right away
@@ -760,50 +724,34 @@ public final class ZkController {
     return baseURL;
   }
 
-
-  void publishAsActive(String shardUrl,
-      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
-    Map<String,String> finalProps = new HashMap<String,String>();
-    finalProps.put(ZkStateReader.BASE_URL_PROP, shardUrl);
-    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
-    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
-
-    publishState(cd, shardZkNodeName, coreName, finalProps);
-  }
-
-  public void publish(CoreDescriptor cd, String state) {
-    Map<String,String> finalProps = new HashMap<String,String>();
-    finalProps.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
-    finalProps.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
-    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    finalProps.put(ZkStateReader.STATE_PROP, state);
-    publishState(cd, getNodeName() + "_" + cd.getName(),
-        cd.getName(), finalProps);
-  }
-  
-  void publishAsDown(String baseUrl,
-      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
-    Map<String,String> finalProps = new HashMap<String,String>();
-    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
-    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
-    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.DOWN);
- 
-    publishState(cd, shardZkNodeName, coreName, finalProps);
-  }
-  
-  void publishAsRecoveryFailed(String baseUrl,
-      final CoreDescriptor cd, String shardZkNodeName, String coreName) {
-    Map<String,String> finalProps = new HashMap<String,String>();
-    finalProps.put(ZkStateReader.BASE_URL_PROP, baseUrl);
-    finalProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
-    finalProps.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    finalProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERY_FAILED);
-    publishState(cd, shardZkNodeName, coreName, finalProps);
+  /**
+   * Publish core state to overseer.
+   * @param cd
+   * @param state
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public void publish(final CoreDescriptor cd, final String state) throws KeeperException, InterruptedException {
+    //System.out.println(Thread.currentThread().getStackTrace()[3]);
+    Integer numShards = cd.getCloudDescriptor().getNumShards();
+    if (numShards == null) { //XXX sys prop hack
+      numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
+    }
+    
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", 
+        ZkStateReader.STATE_PROP, state, 
+        ZkStateReader.BASE_URL_PROP, getBaseUrl(), 
+        ZkStateReader.CORE_NAME_PROP, cd.getName(),
+        ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles(),
+        ZkStateReader.NODE_NAME_PROP, getNodeName(),
+        ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId(),
+        ZkStateReader.COLLECTION_PROP, cd.getCloudDescriptor()
+            .getCollectionName(), ZkStateReader.STATE_PROP, state,
+        ZkStateReader.NUM_SHARDS_PROP, numShards != null ? numShards.toString()
+            : null);
+    overseerStatusQueue.offer(ZkStateReader.toJSON(m));
   }
 
-
   private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
       final CloudState state, final String shardZkNodeName) {
 
@@ -826,10 +774,12 @@ public final class ZkController {
    */
   public void unregister(String coreName, CloudDescriptor cloudDesc)
       throws InterruptedException, KeeperException {
-    synchronized (coreStates) {
-      coreStates.remove(coreName);
-    }
-    publishState();
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+        "deletecore", ZkStateReader.CORE_NAME_PROP, coreName,
+        ZkStateReader.NODE_NAME_PROP, getNodeName(),
+        ZkStateReader.COLLECTION_PROP, cloudDesc.getCollectionName());
+    overseerStatusQueue.offer(ZkStateReader.toJSON(m));
+
     final String zkNodeName = getNodeName() + "_" + coreName;
     ElectionContext context = electionContexts.remove(zkNodeName);
     if (context != null) {
@@ -993,83 +943,6 @@ public final class ZkController {
     return zkStateReader;
   }
 
-  
-  private void publishState(CoreDescriptor cd, String shardZkNodeName, String coreName,
-      Map<String,String> props) {
-    CloudDescriptor cloudDesc = cd.getCloudDescriptor();
-    if (cloudDesc.getRoles() != null) {
-      props.put(ZkStateReader.ROLES_PROP, cloudDesc.getRoles());
-    }
-    
-    if (cloudDesc.getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getCloudState(), shardZkNodeName)) {
-      // publish with no shard id so we are assigned one, and then look for it
-      doPublish(shardZkNodeName, coreName, props, cloudDesc);
-      String shardId;
-      try {
-        shardId = doGetShardIdProcess(coreName, cloudDesc);
-      } catch (InterruptedException e) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
-      }
-      cloudDesc.setShardId(shardId);
-    }
-   
-    
-    if (!props.containsKey(ZkStateReader.SHARD_ID_PROP) && cloudDesc.getShardId() != null) {
-      props.put(ZkStateReader.SHARD_ID_PROP, cloudDesc.getShardId());
-    }
-    
-    doPublish(shardZkNodeName, coreName, props, cloudDesc);
-  }
-
-
-  private void doPublish(String shardZkNodeName, String coreName,
-      Map<String,String> props, CloudDescriptor cloudDesc) {
-    Integer numShards = cloudDesc.getNumShards();
-    if (numShards == null) {
-      numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
-    }
-    CoreState coreState = new CoreState(coreName,
-        cloudDesc.getCollectionName(), props, numShards);
-    
-    synchronized (coreStates) {
-      coreStates.put(coreName, coreState);
-    }
-    
-    publishState();
-  }
-  
-  private void publishState() {
-    final String nodePath = "/node_states/" + getNodeName();
-
-    long version;
-    byte[] coreStatesData;
-    synchronized (coreStates) {
-      version = ++coreStatesVersion;
-      coreStatesData = ZkStateReader.toJSON(coreStates.values());
-    }
-
-    // if multiple threads are trying to publish state, make sure that we never write
-    // an older version after a newer version.
-    synchronized (coreStatesPublishLock) {
-      try {
-        if (version < coreStatesPublishedVersion) {
-          log.info("Another thread already published a newer coreStates: ours="+version + " lastPublished=" + coreStatesPublishedVersion);
-        } else {
-          zkClient.setData(nodePath, coreStatesData, true);
-          coreStatesPublishedVersion = version;  // put it after so it won't be set if there's an exception
-        }
-      } catch (KeeperException e) {
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "could not publish node state", e);
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
-            "could not publish node state", e);
-      }
-    }
-  }
-
   private String doGetShardIdProcess(String coreName, CloudDescriptor descriptor)
       throws InterruptedException {
     final String shardZkNodeName = getNodeName() + "_" + coreName;
@@ -1086,6 +959,7 @@ public final class ZkController {
         Thread.currentThread().interrupt();
       }
     }
+    
     throw new SolrException(ErrorCode.SERVER_ERROR,
         "Could not get shard_id for core: " + coreName);
   }
@@ -1106,14 +980,30 @@ public final class ZkController {
     }
   }
   
+  private String getCoreNodeName(CoreDescriptor descriptor){
+    return getNodeName() + "_"
+        + descriptor.getName();
+  }
+  
   public static void uploadConfigDir(SolrZkClient zkClient, File dir, String configName) throws IOException, KeeperException, InterruptedException {
     uploadToZK(zkClient, dir, ZkController.CONFIGS_ZKNODE + "/" + configName);
   }
 
-  public void preRegister(CoreDescriptor cd) {
+  public void preRegister(CoreDescriptor cd) throws KeeperException, InterruptedException {
     // before becoming available, make sure we are not live and active
     // this also gets us our assigned shard id if it was not specified
     publish(cd, ZkStateReader.DOWN); 
+    String shardZkNodeName = getCoreNodeName(cd);
+    if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getCloudState(), shardZkNodeName)) {
+      String shardId;
+      try {
+        shardId = doGetShardIdProcess(cd.getName(), cd.getCloudDescriptor());
+      } catch (InterruptedException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+      }
+      cd.getCloudDescriptor().setShardId(shardId);
+    }
+
   }
 
   private ZkCoreNodeProps waitForLeaderToSeeDownState(

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Jun  8 05:21:28 2012
@@ -629,7 +629,18 @@ public class CoreContainer 
 
     if (zkController != null) {
       // this happens before we can receive requests
-      zkController.preRegister(core.getCoreDescriptor());
+      try {
+        zkController.preRegister(core.getCoreDescriptor());
+      } catch (KeeperException e) {
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      }
     }
     
     SolrCore old = null;
@@ -662,7 +673,6 @@ public class CoreContainer 
     }
   }
 
-
   private void registerInZk(SolrCore core) {
     if (zkController != null) {
       try {
@@ -676,7 +686,18 @@ public class CoreContainer 
       } catch (Exception e) {
         // if register fails, this is really bad - close the zkController to
         // minimize any damage we can cause
-        zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
+        try {
+          zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
+        } catch (KeeperException e1) {
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        }
         SolrException.log(log, "", e);
         throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
             e);

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Fri Jun  8 05:21:28 2012
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.CoreState;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -42,7 +41,6 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
@@ -62,7 +60,6 @@ public class OverseerTest extends SolrTe
     private final String nodeName;
     private final String collection;
     private final LeaderElector elector;
-    private final Map<String, CoreState> coreStates = Collections.synchronizedMap(new HashMap<String, CoreState>());
     private final Map<String, ElectionContext> electionContext = Collections.synchronizedMap(new HashMap<String, ElectionContext>());
     
     public MockZKController(String zkAddress, String nodeName, String collection) throws InterruptedException, TimeoutException, IOException, KeeperException {
@@ -71,7 +68,6 @@ public class OverseerTest extends SolrTe
       zkClient = new SolrZkClient(zkAddress, TIMEOUT);
       zkStateReader = new ZkStateReader(zkClient);
       zkStateReader.createClusterStateWatchersAndUpdate();
-      Overseer.createClientNodes(zkClient, nodeName);
       
       // live node
       final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
@@ -104,29 +100,29 @@ public class OverseerTest extends SolrTe
     public void publishState(String coreName, String stateName, int numShards)
         throws KeeperException, InterruptedException, IOException {
       if (stateName == null) {
-        coreStates.remove(coreName);
         ElectionContext ec = electionContext.remove(coreName);
         if (ec != null) {
           ec.cancelElection();
         }
+        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "deletecore",
+            ZkStateReader.NODE_NAME_PROP, nodeName,
+            ZkStateReader.CORE_NAME_PROP, coreName,
+            ZkStateReader.COLLECTION_PROP, collection);
+            DistributedQueue q = Overseer.getInQueue(zkClient);
+            q.offer(ZkStateReader.toJSON(m));
+
       } else {
-        HashMap<String,String> coreProps = new HashMap<String,String>();
-        coreProps.put(ZkStateReader.STATE_PROP, stateName);
-        coreProps.put(ZkStateReader.NODE_NAME_PROP, nodeName);
-        coreProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
-        coreProps.put(ZkStateReader.COLLECTION_PROP, collection);
-        coreProps.put(ZkStateReader.BASE_URL_PROP, "http://" + nodeName
+        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+        ZkStateReader.STATE_PROP, stateName,
+        ZkStateReader.NODE_NAME_PROP, nodeName,
+        ZkStateReader.CORE_NAME_PROP, coreName,
+        ZkStateReader.COLLECTION_PROP, collection,
+        ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
+        ZkStateReader.BASE_URL_PROP, "http://" + nodeName
             + "/solr/");
-        CoreState state = new CoreState(coreName, collection, coreProps,
-            numShards);
-        coreStates.remove(coreName);
-        coreStates.put(coreName, state);
-      }
-      final String statePath = Overseer.STATES_NODE + "/" + nodeName;
-      zkClient.setData(
-          statePath,
-          ZkStateReader.toJSON(coreStates.values().toArray(
-              new CoreState[coreStates.size()])), true);
+        DistributedQueue q = Overseer.getInQueue(zkClient);
+        q.offer(ZkStateReader.toJSON(m));
+      }
       
       for (int i = 0; i < 30; i++) {
         String shardId = getShardId(coreName);
@@ -196,8 +192,6 @@ public class OverseerTest extends SolrTe
       ZkStateReader reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      System.setProperty(ZkStateReader.NUM_SHARDS_PROP, "3");
-
       zkController = new ZkController(null, server.getZkAddress(), TIMEOUT, 10000,
           "localhost", "8983", "solr", new CurrentCoreDescriptorProvider() {
 
@@ -216,6 +210,7 @@ public class OverseerTest extends SolrTe
       
       for (int i = 0; i < numShards; i++) {
         CloudDescriptor collection1Desc = new CloudDescriptor();
+        collection1Desc.setNumShards(3);
         collection1Desc.setCollectionName("collection1");
         CoreDescriptor desc1 = new CoreDescriptor(null, "core" + (i + 1), "");
         desc1.setCloudDescriptor(collection1Desc);
@@ -238,16 +233,13 @@ public class OverseerTest extends SolrTe
       assertNotNull(reader.getLeaderUrl("collection1", "shard3", 15000));
       
     } finally {
-      System.clearProperty(ZkStateReader.NUM_SHARDS_PROP);
       System.clearProperty("bootstrap_confdir");
       if (DEBUG) {
         if (zkController != null) {
           zkClient.printLayoutToStdOut();
         }
       }
-      if (zkClient != null) {
-        zkClient.close();
-      }
+      close(zkClient);
       if (zkController != null) {
         zkController.close();
       }
@@ -266,6 +258,7 @@ public class OverseerTest extends SolrTe
     
     ZkTestServer server = new ZkTestServer(zkDir);
 
+    System.setProperty(ZkStateReader.NUM_SHARDS_PROP, Integer.toString(sliceCount));
     SolrZkClient zkClient = null;
     ZkStateReader reader = null;
     final ZkController[] controllers = new ZkController[nodeCount];
@@ -281,8 +274,6 @@ public class OverseerTest extends SolrTe
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      System.setProperty(ZkStateReader.NUM_SHARDS_PROP, Integer.valueOf(sliceCount).toString());
-
       for (int i = 0; i < nodeCount; i++) {
       
       controllers[i] = new ZkController(null, server.getZkAddress(), TIMEOUT, 10000,
@@ -313,6 +304,7 @@ public class OverseerTest extends SolrTe
           public void run() {
             final CloudDescriptor collection1Desc = new CloudDescriptor();
             collection1Desc.setCollectionName("collection1");
+            collection1Desc.setNumShards(sliceCount);
 
             final String coreName = "core" + slot;
             
@@ -402,12 +394,8 @@ public class OverseerTest extends SolrTe
           zkClient.printLayoutToStdOut();
         }
       }
-      if (zkClient != null) {
-        zkClient.close();
-      }
-      if (reader != null) {
-        reader.close();
-      }
+      close(zkClient);
+      close(reader);
       for (int i = 0; i < controllers.length; i++)
         if (controllers[i] != null) {
           controllers[i].close();
@@ -457,36 +445,23 @@ public class OverseerTest extends SolrTe
       AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
       zkClient.makePath("/live_nodes", true);
 
-      //live node
-      String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + "node1";
-      zkClient.makePath(nodePath,CreateMode.EPHEMERAL, true);
-
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      Overseer.createClientNodes(zkClient, "node1");
-      
       overseerClient = electNewOverseer(server.getZkAddress());
 
-      HashMap<String, String> coreProps = new HashMap<String,String>();
-      coreProps.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr");
-      coreProps.put(ZkStateReader.NODE_NAME_PROP, "node1");
-      coreProps.put(ZkStateReader.CORE_NAME_PROP, "core1");
-      coreProps.put(ZkStateReader.ROLES_PROP, "");
-      coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
-      CoreState state = new CoreState("core1", "collection1", coreProps, 2);
+      DistributedQueue q = Overseer.getInQueue(zkClient);
       
-      nodePath = "/node_states/node1";
-
-      try {
-        zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
-      } catch (KeeperException ke) {
-        if(ke.code()!=Code.NODEEXISTS) {
-          throw ke;
-        }
-      }
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.COLLECTION_PROP, "collection1",
+          ZkStateReader.CORE_NAME_PROP, "core1",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+      
+      q.offer(ZkStateReader.toJSON(m));
       
-      zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
       waitForCollections(reader, "collection1");
 
       assertEquals(reader.getCloudState().toString(), ZkStateReader.RECOVERING,
@@ -494,27 +469,24 @@ public class OverseerTest extends SolrTe
               .get("node1_core1").get(ZkStateReader.STATE_PROP));
 
       //publish node state (active)
-      coreProps.put(ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
-      
-      coreProps.put(ZkStateReader.SHARD_ID_PROP, "shard1");
-      state = new CoreState("core1", "collection1", coreProps, 2);
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.COLLECTION_PROP, "collection1",
+          ZkStateReader.CORE_NAME_PROP, "core1",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, ZkStateReader.ACTIVE);
 
-      zkClient.setData(nodePath, ZkStateReader.toJSON(new CoreState[]{state}), true);
+      q.offer(ZkStateReader.toJSON(m));
 
       verifyStatus(reader, ZkStateReader.ACTIVE);
 
     } finally {
 
-      if (zkClient != null) {
-        zkClient.close();
-      }
-      if (overseerClient != null) {
-        overseerClient.close();
-      }
+      close(zkClient);
+      close(overseerClient);
 
-      if (reader != null) {
-        reader.close();
-      }
+      close(reader);
       server.shutdown();
     }
   }
@@ -608,24 +580,16 @@ public class OverseerTest extends SolrTe
       version = getCloudStateVersion(controllerClient);
       mockController.publishState("core1", null,1);
       while(version == getCloudStateVersion(controllerClient));
-      Thread.sleep(100);
+      Thread.sleep(500);
       assertEquals("Shard count does not match", 0, reader.getCloudState()
           .getSlice("collection1", "shard1").getShards().size());
     } finally {
       
-      if (mockController != null) {
-        mockController.close();
-      }
+      close(mockController);
       
-      if (overseerClient != null) {
-       overseerClient.close();
-      }
-      if (controllerClient != null) {
-        controllerClient.close();
-      }
-      if (reader != null) {
-        reader.close();
-      }
+      close(overseerClient);
+      close(controllerClient);
+      close(reader);
       server.shutdown();
     }
   }
@@ -719,18 +683,10 @@ public class OverseerTest extends SolrTe
           killerThread.join();
         }
       }
-      if (mockController != null) {
-        mockController.close();
-      }
-      if (mockController2 != null) {
-        mockController2.close();
-      }
-      if (controllerClient != null) {
-        controllerClient.close();
-      }
-      if (reader != null) {
-        reader.close();
-      }
+      close(mockController);
+      close(mockController2);
+      close(controllerClient);
+      close(reader);
       server.shutdown();
     }
   }
@@ -791,19 +747,10 @@ public class OverseerTest extends SolrTe
       assertEquals("Shard was found in more than 1 times in CloudState", 1,
           numFound);
     } finally {
-      if (overseerClient != null) {
-       overseerClient.close();
-      }
-      if (mockController != null) {
-        mockController.close();
-      }
-
-      if (controllerClient != null) {
-        controllerClient.close();
-      }
-      if (reader != null) {
-        reader.close();
-      }
+      close(overseerClient);
+      close(mockController);
+      close(controllerClient);
+      close(reader);
       server.shutdown();
     }
   }
@@ -842,23 +789,101 @@ public class OverseerTest extends SolrTe
       assertEquals("Slicecount does not match", 12, reader.getCloudState().getSlices("collection1").size());
       
     } finally {
-      if (overseerClient != null) {
-       overseerClient.close();
-      }
-      if (mockController != null) {
-        mockController.close();
-      }
+      close(overseerClient);
+      close(mockController);
+      close(controllerClient);
+      close(reader);
+      server.shutdown();
+    }
+  }
 
-      if (controllerClient != null) {
-        controllerClient.close();
-      }
-      if (reader != null) {
-        reader.close();
+  private void close(MockZKController mockController) {
+    if (mockController != null) {
+      mockController.close();
+    }
+  }
+
+  
+  @Test
+  public void testReplay() throws Exception{
+    String zkDir = dataDir.getAbsolutePath() + File.separator
+        + "zookeeper/server1/data";
+    ZkTestServer server = new ZkTestServer(zkDir);
+    SolrZkClient zkClient = null;
+    SolrZkClient overseerClient = null;
+    ZkStateReader reader = null;
+    
+    try {
+      server.run();
+      zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+      zkClient.makePath(ZkStateReader.LIVE_NODES_ZKNODE, true);
+
+      reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      //prepopulate work queue with some items to emulate previous overseer died before persisting state
+      DistributedQueue queue = Overseer.getInternalQueue(zkClient);
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.SHARD_ID_PROP, "s1",
+          ZkStateReader.COLLECTION_PROP, "collection1",
+          ZkStateReader.CORE_NAME_PROP, "core1",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+      queue.offer(ZkStateReader.toJSON(m));
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.SHARD_ID_PROP, "s1",
+          ZkStateReader.COLLECTION_PROP, "collection1",
+          ZkStateReader.CORE_NAME_PROP, "core2",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+      queue.offer(ZkStateReader.toJSON(m));
+      
+      overseerClient = electNewOverseer(server.getZkAddress());
+      
+      //submit to proper queue
+      queue = Overseer.getInQueue(zkClient);
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+          ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.SHARD_ID_PROP, "s1",
+          ZkStateReader.COLLECTION_PROP, "collection1",
+          ZkStateReader.CORE_NAME_PROP, "core3",
+          ZkStateReader.ROLES_PROP, "",
+          ZkStateReader.STATE_PROP, ZkStateReader.RECOVERING);
+      queue.offer(ZkStateReader.toJSON(m));
+      
+      for(int i=0;i<100;i++) {
+        Slice s = reader.getCloudState().getSlice("collection1", "s1");
+        if(s!=null && s.getShards().size()==3) break;
+        Thread.sleep(100);
       }
+      assertNotNull(reader.getCloudState().getSlice("collection1", "s1"));
+      assertEquals(3, reader.getCloudState().getSlice("collection1", "s1").getShards().size());
+    } finally {
+      close(overseerClient);
+      close(zkClient);
+      close(reader);
       server.shutdown();
     }
   }
 
+  private void close(ZkStateReader reader) {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+  private void close(SolrZkClient overseerClient) throws InterruptedException {
+    if (overseerClient != null) {
+      overseerClient.close();
+    }
+  }
+  
   private int getCloudStateVersion(SolrZkClient controllerClient)
       throws KeeperException, InterruptedException {
     return controllerClient.exists(ZkStateReader.CLUSTER_STATE, null, false).getVersion();
@@ -870,9 +895,10 @@ public class OverseerTest extends SolrTe
     SolrZkClient zkClient  = new SolrZkClient(address, TIMEOUT);
     ZkStateReader reader = new ZkStateReader(zkClient);
     LeaderElector overseerElector = new LeaderElector(zkClient);
-    ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, reader);
+    ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), reader);
     overseerElector.setup(ec);
     overseerElector.joinElection(ec);
     return zkClient;
   }
+  
 }
\ No newline at end of file

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java?rev=1347880&r1=1347879&r2=1347880&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/CloudState.java Fri Jun  8 05:21:28 2012
@@ -89,6 +89,19 @@ public class CloudState implements JSONW
 	  if (collectionLeaders == null) return null;
 	  return collectionLeaders.get(shard);
 	}
+	
+	/**
+	 * Get shard properties or null if shard is not found.
+	 */
+	public ZkNodeProps getShardProps(final String collection, final String coreNodeName) {
+	  Map<String, Slice> slices = getSlices(collection);
+	  for(Slice slice: slices.values()) {
+	    if(slice.getShards().get(coreNodeName)!=null) {
+	      return slice.getShards().get(coreNodeName);
+	    }
+	  }
+	  return null;
+	}
 
   private void addRangeInfos(Set<String> collections) {
     for (String collection : collections) {