You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/09/02 15:06:22 UTC

svn commit: r1700800 [10/24] - in /lucene/dev/branches/lucene6699: ./ dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/scripts/ lucene/ lucene/analysis/ lucene/analysis/common/ lucene/analysis/common/src/java/org/apache/lucene/analysis/ar/...

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Wed Sep  2 13:06:13 2015
@@ -18,19 +18,21 @@
 
 package org.apache.solr.cloud;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -40,23 +42,60 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A distributed queue from zk recipes.
+ * A distributed queue.
  */
 public class DistributedQueue {
   private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
-  
-  private static long DEFAULT_TIMEOUT = 5*60*1000;
-  
-  private final String dir;
-  
-  private SolrZkClient zookeeper;
-  
-  private final String prefix = "qn-";
-  
-  private final String response_prefix = "qnr-" ;
 
-  private final Overseer.Stats stats;
-  
+  static final String PREFIX = "qn-";
+
+  /**
+   * Theory of operation:
+   * <p>
+   * Under ordinary circumstances we neither watch nor poll for children in ZK.
+   * Instead we keep an in-memory list of known child names.  When the in-memory
+   * list is exhausted, we then fetch from ZK.
+   * <p>
+   * We only bother setting a child watcher when the queue has no children in ZK.
+   */
+  private static final Object _IMPLEMENTATION_NOTES = null;
+
+  final String dir;
+
+  final SolrZkClient zookeeper;
+
+  final Overseer.Stats stats;
+
+  /**
+   * A lock that guards all of the mutable state that follows.
+   */
+  private final ReentrantLock updateLock = new ReentrantLock();
+
+  /**
+   * Contains the last set of children fetched from ZK. Elements are removed from the head of
+   * this in-memory set as they are consumed from the queue.  Due to the distributed nature
+   * of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
+   * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
+   * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+   */
+  private TreeSet<String> knownChildren = new TreeSet<>();
+
+  /**
+   * Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
+   * {@link #knownChildren} is empty before waiting on this condition.
+   */
+  private final Condition notEmpty = updateLock.newCondition();
+
+  /**
+   * If non-null, the last watcher to listen for child changes.
+   */
+  private ChildWatcher lastWatcher = null;
+
+  /**
+   * If true, ZK's child list probably doesn't match what's in memory.
+   */
+  private boolean isDirty = true;
+
   public DistributedQueue(SolrZkClient zookeeper, String dir) {
     this(zookeeper, dir, new Overseer.Stats());
   }
@@ -77,554 +116,312 @@ public class DistributedQueue {
     this.zookeeper = zookeeper;
     this.stats = stats;
   }
-  
-  /**
-   * Returns a Map of the children, ordered by id.
-   * 
-   * @param watcher
-   *          optional watcher on getChildren() operation.
-   * @return map from id to child name for all children
-   */
-  private TreeMap<Long,String> orderedChildren(Watcher watcher)
-      throws KeeperException, InterruptedException {
-    TreeMap<Long,String> orderedChildren = new TreeMap<>();
-
-    List<String> childNames = zookeeper.getChildren(dir, watcher, true);
-    stats.setQueueLength(childNames.size());
-    for (String childName : childNames) {
-      try {
-        // Check format
-        if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
-          LOG.debug("Found child node with improper name: " + childName);
-          continue;
-        }
-        String suffix = childName.substring(prefix.length());
-        Long childId = new Long(suffix);
-        orderedChildren.put(childId, childName);
-      } catch (NumberFormatException e) {
-        LOG.warn("Found child node with improper format : " + childName + " "
-            + e, e);
-      }
-    }
-    
-    return orderedChildren;
-  }
-
 
   /**
-   * Returns true if the queue contains a task with the specified async id.
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty.
+   *
+   * @return data at the first element of the queue, or null.
    */
-  public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
-      throws KeeperException, InterruptedException {
-
-    List<String> childNames = zookeeper.getChildren(dir, null, true);
-    stats.setQueueLength(childNames.size());
-    for (String childName : childNames) {
-      if (childName != null) {
-        try {
-          byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
-          if (data != null) {
-            ZkNodeProps message = ZkNodeProps.load(data);
-            if (message.containsKey(requestIdKey)) {
-              LOG.debug(">>>> {}", message.get(requestIdKey));
-              if(message.get(requestIdKey).equals(requestId)) return true;
-            }
-          }
-        } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
-        }
-      }
+  public byte[] peek() throws KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_peek");
+    try {
+      return firstElement();
+    } finally {
+      time.stop();
     }
-
-    return false;
   }
 
-
   /**
-   * Return the head of the queue without modifying the queue.
-   * 
-   * @return the data at the head of the queue.
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty and block is false.
+   *
+   * @param block if true, blocks until an element enters the queue
+   * @return data at the first element of the queue, or null.
    */
-  private QueueEvent element() throws KeeperException,
-      InterruptedException {
-    TreeMap<Long,String> orderedChildren;
-    
-    // element, take, and remove follow the same pattern.
-    // We want to return the child node with the smallest sequence number.
-    // Since other clients are remove()ing and take()ing nodes concurrently,
-    // the child with the smallest sequence number in orderedChildren might be
-    // gone by the time we check.
-    // We don't call getChildren again until we have tried the rest of the nodes
-    // in sequence order.
-    while (true) {
-      try {
-        orderedChildren = orderedChildren(null);
-      } catch (KeeperException.NoNodeException e) {
-        return null;
-      }
-      if (orderedChildren.size() == 0) return null;
-      
-      for (String headNode : orderedChildren.values()) {
-        if (headNode != null) {
-          try {
-            return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
-          } catch (KeeperException.NoNodeException e) {
-            // Another client removed the node first, try next
-          }
-        }
-      }
-    }
+  public byte[] peek(boolean block) throws KeeperException, InterruptedException {
+    return block ? peek(Long.MAX_VALUE) : peek();
   }
-  
+
   /**
-   * Attempts to remove the head of the queue and return it.
-   * 
-   * @return The former head of the queue
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty after wait ms.
+   *
+   * @param wait max wait time in ms.
+   * @return data at the first element of the queue, or null.
    */
-  public byte[] remove() throws NoSuchElementException, KeeperException,
-      InterruptedException {
-    TreeMap<Long,String> orderedChildren;
-    // Same as for element. Should refactor this.
-    TimerContext time = stats.time(dir + "_remove");
+  public byte[] peek(long wait) throws KeeperException, InterruptedException {
+    Preconditions.checkArgument(wait > 0);
+    TimerContext time;
+    if (wait == Long.MAX_VALUE) {
+      time = stats.time(dir + "_peek_wait_forever");
+    } else {
+      time = stats.time(dir + "_peek_wait" + wait);
+    }
+    updateLock.lockInterruptibly();
     try {
-      while (true) {
-        try {
-          orderedChildren = orderedChildren(null);
-        } catch (KeeperException.NoNodeException e) {
-          throw new NoSuchElementException();
-        }
-        if (orderedChildren.size() == 0) throw new NoSuchElementException();
-
-        for (String headNode : orderedChildren.values()) {
-          String path = dir + "/" + headNode;
-          try {
-            byte[] data = zookeeper.getData(path, null, null, true);
-            zookeeper.delete(path, -1, true);
-            return data;
-          } catch (KeeperException.NoNodeException e) {
-            // Another client deleted the node first.
-          }
+      long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+      while (waitNanos > 0) {
+        byte[] result = firstElement();
+        if (result != null) {
+          return result;
         }
-
+        waitNanos = notEmpty.awaitNanos(waitNanos);
       }
+      return null;
     } finally {
+      updateLock.unlock();
       time.stop();
     }
   }
-  
+
   /**
-   * Remove the event and save the response into the other path.
-   * 
+   * Attempts to remove the head of the queue and return it. Returns null if the
+   * queue is empty.
+   *
+   * @return Head of the queue or null.
    */
-  public byte[] remove(QueueEvent event) throws KeeperException,
-      InterruptedException {
-    TimerContext time = stats.time(dir + "_remove_event");
+  public byte[] poll() throws KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_poll");
     try {
-      String path = event.getId();
-      String responsePath = dir + "/" + response_prefix
-          + path.substring(path.lastIndexOf("-") + 1);
-      if (zookeeper.exists(responsePath, true)) {
-        zookeeper.setData(responsePath, event.getBytes(), true);
-      }
-      byte[] data = zookeeper.getData(path, null, null, true);
-      zookeeper.delete(path, -1, true);
-      return data;
+      return removeFirst();
     } finally {
       time.stop();
     }
   }
 
   /**
-   * Watcher that blocks until a WatchedEvent occurs for a znode.
+   * Attempts to remove the head of the queue and return it.
+   *
+   * @return The former head of the queue
    */
-  private final class LatchWatcher implements Watcher {
-
-    private final Object lock;
-    private WatchedEvent event;
-    private Event.EventType latchEventType;
-
-    LatchWatcher(Object lock) {
-      this(lock, null);
-    }
-
-    LatchWatcher(Event.EventType eventType) {
-      this(new Object(), eventType);
-    }
-
-    LatchWatcher(Object lock, Event.EventType eventType) {
-      this.lock = lock;
-      this.latchEventType = eventType;
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-      Event.EventType eventType = event.getType();
-      // None events are ignored
-      // If latchEventType is not null, only fire if the type matches
-      if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
-        LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
-        synchronized (lock) {
-          this.event = event;
-          lock.notifyAll();
-        }
-      }
-    }
-
-    public void await(long timeout) throws InterruptedException {
-      synchronized (lock) {
-        if (this.event != null) return;
-        lock.wait(timeout);
-      }
-    }
-
-    public WatchedEvent getWatchedEvent() {
-      return event;
-    }
-  }
-
-  // we avoid creating *many* watches in some cases
-  // by saving the childrenWatcher and the children associated - see SOLR-6336
-  private LatchWatcher childrenWatcher;
-  private TreeMap<Long,String> fetchedChildren;
-  private final Object childrenWatcherLock = new Object();
-
-  private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
-  {
-    LatchWatcher watcher;
-    TreeMap<Long,String> children;
-    synchronized (childrenWatcherLock) {
-      watcher = childrenWatcher;
-      children = fetchedChildren;
-    }
-
-    if (watcher == null ||  watcher.getWatchedEvent() != null) {
-      // this watcher is only interested in child change events
-      watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
-      while (true) {
-        try {
-          children = orderedChildren(watcher);
-          break;
-        } catch (KeeperException.NoNodeException e) {
-          zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
-          // go back to the loop and try again
-        }
-      }
-      synchronized (childrenWatcherLock) {
-        childrenWatcher = watcher;
-        fetchedChildren = children;
-      }
-    }
-
-    while (true) {
-      if (!children.isEmpty()) break;
-      watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
-      if (watcher.getWatchedEvent() != null) {
-        children = orderedChildren(null);
+  public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_remove");
+    try {
+      byte[] result = removeFirst();
+      if (result == null) {
+        throw new NoSuchElementException();
       }
-      if (wait != Long.MAX_VALUE) break;
+      return result;
+    } finally {
+      time.stop();
     }
-    return Collections.unmodifiableMap(children);
   }
 
   /**
    * Removes the head of the queue and returns it, blocks until it succeeds.
-   * 
+   *
    * @return The former head of the queue
    */
   public byte[] take() throws KeeperException, InterruptedException {
     // Same as for element. Should refactor this.
     TimerContext timer = stats.time(dir + "_take");
+    updateLock.lockInterruptibly();
     try {
-      Map<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
-      for (String headNode : orderedChildren.values()) {
-        String path = dir + "/" + headNode;
-        try {
-          byte[] data = zookeeper.getData(path, null, null, true);
-          zookeeper.delete(path, -1, true);
-          return data;
-        } catch (KeeperException.NoNodeException e) {
-          // Another client deleted the node first.
+      while (true) {
+        byte[] result = removeFirst();
+        if (result != null) {
+          return result;
         }
+        notEmpty.await();
       }
-      return null; // shouldn't really reach here..
     } finally {
+      updateLock.unlock();
       timer.stop();
     }
   }
-  
+
   /**
-   * Inserts data into queue.
-   * 
-   * @return true if data was successfully added
+   * Inserts data into queue.  Successfully calling this method does NOT guarantee
+   * that the element will be immediately available in the in-memory queue. In particular,
+   * calling this method on an empty queue will not necessarily cause {@link #poll()} to
+   * return the offered element.  Use a blocking method if you must wait for the offered
+   * element to become visible.
    */
-  public boolean offer(byte[] data) throws KeeperException,
-      InterruptedException {
+  public void offer(byte[] data) throws KeeperException, InterruptedException {
     TimerContext time = stats.time(dir + "_offer");
     try {
-      return createData(dir + "/" + prefix, data,
-          CreateMode.PERSISTENT_SEQUENTIAL) != null;
-    } finally {
-      time.stop();
-    }
-  }
-  
-  /**
-   * Inserts data into zookeeper.
-   * 
-   * @return true if data was successfully added
-   */
-  private String createData(String path, byte[] data, CreateMode mode)
-      throws KeeperException, InterruptedException {
-    for (;;) {
-      try {
-        return zookeeper.create(path, data, mode, true);
-      } catch (KeeperException.NoNodeException e) {
+      while (true) {
         try {
-          zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
-        } catch (KeeperException.NodeExistsException ne) {
-          // someone created it
+          // We don't need to explicitly set isDirty here; if there is a watcher, it will
+          // see the update and set the bit itself; if there is no watcher we can defer
+          // the update anyway.
+          zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+          return;
+        } catch (KeeperException.NoNodeException e) {
+          try {
+            zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+          } catch (KeeperException.NodeExistsException ne) {
+            // someone created it
+          }
         }
       }
+    } finally {
+      time.stop();
     }
   }
-  
+
+  public Overseer.Stats getStats() {
+    return stats;
+  }
+
   /**
-   * Offer the data and wait for the response
-   * 
+   * Returns the name if the first known child node, or {@code null} if the queue is empty.
+   * This is the only place {@link #knownChildren} is ever updated!
+   * The caller must double check that the actual node still exists, since the in-memory
+   * list is inherently stale.
    */
-  public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
-      InterruptedException {
-    TimerContext time = stats.time(dir + "_offer");
+  private String firstChild(boolean remove) throws KeeperException, InterruptedException {
+    updateLock.lockInterruptibly();
     try {
-      String path = createData(dir + "/" + prefix, data,
-          CreateMode.PERSISTENT_SEQUENTIAL);
-      String watchID = createData(
-          dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
-          null, CreateMode.EPHEMERAL);
-
-      Object lock = new Object();
-      LatchWatcher watcher = new LatchWatcher(lock);
-      synchronized (lock) {
-        if (zookeeper.exists(watchID, watcher, true) != null) {
-          watcher.await(timeout);
-        }
+      // Try to fetch the first in-memory child.
+      if (!knownChildren.isEmpty()) {
+        return remove ? knownChildren.pollFirst() : knownChildren.first();
       }
-      byte[] bytes = zookeeper.getData(watchID, null, null, true);
-      zookeeper.delete(watchID, -1, true);
-      return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
+
+      if (lastWatcher != null && !isDirty) {
+        // No children, no known updates, and a watcher is already set; nothing we can do.
+        return null;
+      }
+
+      // Try to fetch an updated list of children from ZK.
+      ChildWatcher newWatcher = new ChildWatcher();
+      knownChildren = fetchZkChildren(newWatcher);
+      lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
+      isDirty = false;
+      if (knownChildren.isEmpty()) {
+        return null;
+      }
+      notEmpty.signalAll();
+      return remove ? knownChildren.pollFirst() : knownChildren.first();
     } finally {
-      time.stop();
+      updateLock.unlock();
     }
   }
-  
+
   /**
-   * Returns the data at the first element of the queue, or null if the queue is
-   * empty.
-   * 
-   * @return data at the first element of the queue, or null.
+   * Return the current set of children from ZK; does not change internal state.
    */
-  public byte[] peek() throws KeeperException, InterruptedException {
-    TimerContext time = stats.time(dir + "_peek");
-    try {
-      QueueEvent element = element();
-      if (element == null) return null;
-      return element.getBytes();
-    } finally {
-      time.stop();
-    }
-  }
-  
-  public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, Long wait)
-      throws KeeperException, InterruptedException {
-    ArrayList<QueueEvent> topN = new ArrayList<>();
-
-    LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
-    TimerContext time = null;
-    if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
-    else time = stats.time(dir + "_peekTopN_wait" + wait);
+  TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
+    while (true) {
+      try {
+        TreeSet<String> orderedChildren = new TreeSet<>();
 
-    try {
-      Map<Long, String> orderedChildren = getChildren(wait);
-      for (String headNode : orderedChildren.values()) {
-        if (headNode != null && topN.size() < n) {
-          try {
-            String id = dir + "/" + headNode;
-            if (excludeSet != null && excludeSet.contains(id)) continue;
-            QueueEvent queueEvent = new QueueEvent(id,
-                zookeeper.getData(dir + "/" + headNode, null, null, true), null);
-            topN.add(queueEvent);
-          } catch (KeeperException.NoNodeException e) {
-            // Another client removed the node first, try next
-          }
-        } else {
-          if (topN.size() >= 1) {
-            printQueueEventsListElementIds(topN);
-            return topN;
+        List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+        stats.setQueueLength(childNames.size());
+        for (String childName : childNames) {
+          // Check format
+          if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+            LOG.debug("Found child node with improper name: " + childName);
+            continue;
           }
+          orderedChildren.add(childName);
         }
+        return orderedChildren;
+      } catch (KeeperException.NoNodeException e) {
+        zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+        // go back to the loop and try again
       }
-
-      if (topN.size() > 0 ) {
-        printQueueEventsListElementIds(topN);
-        return topN;
-      }
-      return null;
-    } finally {
-      time.stop();
     }
   }
 
-  private void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
-    if(LOG.isDebugEnabled()) {
-      StringBuffer sb = new StringBuffer("[");
-      for(QueueEvent queueEvent: topN) {
-        sb.append(queueEvent.getId()).append(", ");
+  /**
+   * Return the currently-known set of children from memory. If there are no children,
+   * waits up to {@code waitMillis} for at least one child to become available. May
+   * update the set of known children.
+   */
+  SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
+    long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+    while (waitNanos > 0) {
+      // Trigger a fetch if needed.
+      firstElement();
+      updateLock.lockInterruptibly();
+      try {
+        if (!knownChildren.isEmpty()) {
+          return new TreeSet<>(knownChildren);
+        }
+        waitNanos = notEmpty.awaitNanos(waitNanos);
+      } finally {
+        updateLock.unlock();
       }
-      sb.append("]");
-      LOG.debug("Returning topN elements: {}", sb.toString());
     }
+    return Collections.emptySortedSet();
   }
 
-
   /**
+   * Return the head of the queue without modifying the queue.
    *
-   * Gets last element of the Queue without removing it.
+   * @return the data at the head of the queue.
    */
-  public String getTailId() throws KeeperException, InterruptedException {
-    TreeMap<Long, String> orderedChildren = null;
-    orderedChildren = orderedChildren(null);
-    if(orderedChildren == null || orderedChildren.isEmpty()) return null;
-
-    for(String headNode : orderedChildren.descendingMap().values())
-      if (headNode != null) {
+  private byte[] firstElement() throws KeeperException, InterruptedException {
+    while (true) {
+      String firstChild = firstChild(false);
+      if (firstChild == null) {
+        return null;
+      }
+      try {
+        return zookeeper.getData(dir + "/" + firstChild, null, null, true);
+      } catch (KeeperException.NoNodeException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
         try {
-          QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
-              null, null, true), null);
-          return queueEvent.getId();
-        } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
+          knownChildren.remove(firstChild);
+        } finally {
+          updateLock.unlock();
         }
       }
-    return null;
-  }
-  
-  public static class QueueEvent {
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((id == null) ? 0 : id.hashCode());
-      return result;
-    }
-    
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) return true;
-      if (obj == null) return false;
-      if (getClass() != obj.getClass()) return false;
-      QueueEvent other = (QueueEvent) obj;
-      if (id == null) {
-        if (other.id != null) return false;
-      } else if (!id.equals(other.id)) return false;
-      return true;
-    }
-    
-    private WatchedEvent event = null;
-    private String id;
-    private byte[] bytes;
-    
-    QueueEvent(String id, byte[] bytes, WatchedEvent event) {
-      this.id = id;
-      this.bytes = bytes;
-      this.event = event;
-    }
-    
-    public void setId(String id) {
-      this.id = id;
-    }
-    
-    public String getId() {
-      return id;
-    }
-    
-    public void setBytes(byte[] bytes) {
-      this.bytes = bytes;
-    }
-    
-    public byte[] getBytes() {
-      return bytes;
-    }
-    
-    public WatchedEvent getWatchedEvent() {
-      return event;
     }
-    
-  }
-  
-  /**
-   * Returns the data at the first element of the queue, or null if the queue is
-   * empty and block is false.
-   * 
-   * @param block if true, blocks until an element enters the queue
-   * @return data at the first element of the queue, or null.
-   */
-  public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
-    return peek(block ? Long.MAX_VALUE : 0);
   }
 
-  /**
-   * Returns the data at the first element of the queue, or null if the queue is
-   * empty after wait ms.
-   * 
-   * @param wait max wait time in ms.
-   * @return data at the first element of the queue, or null.
-   */
-  public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
-    TimerContext time = null;
-    if (wait == Long.MAX_VALUE) {
-      time = stats.time(dir + "_peek_wait_forever");
-    } else {
-      time = stats.time(dir + "_peek_wait" + wait);
-    }
-    try {
-      if (wait == 0) {
-        return element();
+  private byte[] removeFirst() throws KeeperException, InterruptedException {
+    while (true) {
+      String firstChild = firstChild(true);
+      if (firstChild == null) {
+        return null;
       }
-
-      Map<Long, String> orderedChildren = getChildren(wait);
-      for (String headNode : orderedChildren.values()) {
-        String path = dir + "/" + headNode;
+      try {
+        String path = dir + "/" + firstChild;
+        byte[] result = zookeeper.getData(path, null, null, true);
+        zookeeper.delete(path, -1, true);
+        return result;
+      } catch (KeeperException.NoNodeException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
         try {
-          byte[] data = zookeeper.getData(path, null, null, true);
-          return new QueueEvent(path, data, null);
-        } catch (KeeperException.NoNodeException e) {
-          // Another client deleted the node first.
+          knownChildren.remove(firstChild);
+        } finally {
+          updateLock.unlock();
         }
       }
-      return null;
-    } finally {
-      time.stop();
     }
   }
-  
-  /**
-   * Attempts to remove the head of the queue and return it. Returns null if the
-   * queue is empty.
-   * 
-   * @return Head of the queue or null.
-   */
-  public byte[] poll() throws KeeperException, InterruptedException {
-    TimerContext time = stats.time(dir + "_poll");
+
+  @VisibleForTesting boolean hasWatcher() throws InterruptedException {
+    updateLock.lockInterruptibly();
     try {
-      return remove();
-    } catch (NoSuchElementException e) {
-      return null;
+      return lastWatcher != null;
     } finally {
-      time.stop();
+      updateLock.unlock();
     }
   }
 
-  public Overseer.Stats getStats() {
-    return stats;
+  private class ChildWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent event) {
+      updateLock.lock();
+      try {
+        // this watcher is automatically cleared when fired
+        if (lastWatcher == this) {
+          lastWatcher = null;
+        }
+        // Do no updates in this thread, just signal state back to client threads.
+        isDirty = true;
+        // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
+        notEmpty.signalAll();
+      } finally {
+        updateLock.unlock();
+      }
+    }
   }
 }

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Sep  2 13:06:13 2015
@@ -2,10 +2,12 @@ package org.apache.solr.cloud;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
@@ -30,6 +32,11 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.OpResult.SetDataResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,16 +80,16 @@ public abstract class ElectionContext im
   }
   
   public void cancelElection() throws InterruptedException, KeeperException {
-    if( leaderSeqPath != null ){
+    if (leaderSeqPath != null) {
       try {
-        log.info("canceling election {}",leaderSeqPath );
+        log.info("Canceling election {}", leaderSeqPath);
         zkClient.delete(leaderSeqPath, -1, true);
       } catch (NoNodeException e) {
         // fine
-        log.warn("cancelElection did not find election node to remove {}" ,leaderSeqPath);
+        log.info("cancelElection did not find election node to remove {}", leaderSeqPath);
       }
     } else {
-      log.warn("cancelElection skipped as this context has not been initialized");
+      log.info("cancelElection skipped as this context has not been initialized");
     }
   }
 
@@ -104,6 +111,7 @@ class ShardLeaderElectionContextBase ext
   protected String shardId;
   protected String collection;
   protected LeaderElector leaderElector;
+  protected volatile Integer leaderZkNodeParentVersion;
   
   public ShardLeaderElectionContextBase(LeaderElector leaderElector,
       final String shardId, final String collection, final String coreNodeName,
@@ -129,25 +137,81 @@ class ShardLeaderElectionContextBase ext
   }
   
   @Override
+  public void cancelElection() throws InterruptedException, KeeperException {
+    if (leaderZkNodeParentVersion != null) {
+      try {
+        // We need to be careful and make sure we *only* delete our own leader registration node.
+        // We do this by using a multi and ensuring the parent znode of the leader registration node
+        // matches the version we expect - there is a setData call that increments the parent's znode
+        // version whenever a leader registers.
+        log.info("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion);
+        List<Op> ops = new ArrayList<>(2);
+        ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+        ops.add(Op.delete(leaderPath, -1));
+        zkClient.multi(ops, true);
+      } catch (KeeperException.NoNodeException nne) {
+        // no problem
+        log.info("No leader registration node found to remove: {}", leaderPath);
+      } catch (KeeperException.BadVersionException bve) {
+        log.info("Cannot remove leader registration node because the current registered node is not ours: {}", leaderPath);
+        // no problem
+      } catch (InterruptedException e) {
+        throw e;
+      } catch (Exception e) {
+        SolrException.log(log, e);
+      }
+      leaderZkNodeParentVersion = null;
+    } else {
+      log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
+    }
+    super.cancelElection();
+  }
+  
+  @Override
   void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
       throws KeeperException, InterruptedException, IOException {
-    // register as leader - if an ephemeral is already there, wait just a bit
-    // to see if it goes away
+    // register as leader - if an ephemeral is already there, wait to see if it goes away
+    String parent = new Path(leaderPath).getParent().toString();
+    ZkCmdExecutor zcmd = new ZkCmdExecutor(30000);
+    zcmd.ensureExists(parent, zkClient);
+
     try {
-      RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
-          new RetryCmd() {
-            @Override
-            public void execute() throws Throwable {
-              zkClient.makePath(leaderPath, Utils.toJSON(leaderProps), CreateMode.EPHEMERAL, true);
+      RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, new RetryCmd() {
+        
+        @Override
+        public void execute() throws InterruptedException, KeeperException {
+          log.info("Creating leader registration node", leaderPath);
+          List<Op> ops = new ArrayList<>(2);
+          
+          // We use a multi operation to get the parent nodes version, which will
+          // be used to make sure we only remove our own leader registration node.
+          // The setData call used to get the parent version is also the trigger to
+          // increment the version. We also do a sanity check that our leaderSeqPath exists.
+          
+          ops.add(Op.check(leaderSeqPath, -1));
+          ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL));
+          ops.add(Op.setData(parent, null, -1));
+          List<OpResult> results;
+          
+          results = zkClient.multi(ops, true);
+          
+          for (OpResult result : results) {
+            if (result.getType() == ZooDefs.OpCode.setData) {
+              SetDataResult dresult = (SetDataResult) result;
+              Stat stat = dresult.getStat();
+              leaderZkNodeParentVersion = stat.getVersion();
+              return;
             }
           }
-      );
+          assert leaderZkNodeParentVersion != null;
+        }
+      });
     } catch (Throwable t) {
       if (t instanceof OutOfMemoryError) {
         throw (OutOfMemoryError) t;
       }
       throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
-    }
+    } 
     
     assert shardId != null;
     ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
@@ -158,6 +222,10 @@ class ShardLeaderElectionContextBase ext
         leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
         ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
     Overseer.getInQueue(zkClient).offer(Utils.toJSON(m));
+  }
+
+  public LeaderElector getLeaderElector() {
+    return leaderElector;
   }  
 }
 
@@ -203,7 +271,6 @@ final class ShardLeaderElectionContext e
     ActionThrottle lt;
     try (SolrCore core = cc.getCore(coreName)) {
       if (core == null) {
-        cancelElection();
         throw new SolrException(ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
       }
       MDCLoggingContext.setCore(core);
@@ -225,6 +292,13 @@ final class ShardLeaderElectionContext e
         waitForReplicasToComeUp(leaderVoteWait);
       }
       
+      if (isClosed) {
+        // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later, 
+        // we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will 
+        // re-register the cores and handle a new leadership election.
+        return;
+      }
+      
       try (SolrCore core = cc.getCore(coreName)) {
         
         if (core == null) {
@@ -312,34 +386,38 @@ final class ShardLeaderElectionContext e
       }
       
       boolean isLeader = true;
-      try {
-        super.runLeaderProcess(weAreReplacement, 0);
-      } catch (Exception e) {
-        isLeader = false;
-        SolrException.log(log, "There was a problem trying to register as the leader", e);
-        
-        try (SolrCore core = cc.getCore(coreName)) {
+      if (!isClosed) {
+        try {
+          super.runLeaderProcess(weAreReplacement, 0);
+        } catch (Exception e) {
+          isLeader = false;
+          SolrException.log(log, "There was a problem trying to register as the leader", e);
           
-          if (core == null) {
-            log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
-            return;
+          try (SolrCore core = cc.getCore(coreName)) {
+            
+            if (core == null) {
+              log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+              return;
+            }
+            
+            core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
+            
+            // we could not publish ourselves as leader - try and rejoin election
+            rejoinLeaderElection(core);
           }
-          
-          core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
-          
-          // we could not publish ourselves as leader - try and rejoin election
-          rejoinLeaderElection(core);
         }
-      }
-      
-      if (isLeader) {
-        // check for any replicas in my shard that were set to down by the previous leader
-        try {
-          startLeaderInitiatedRecoveryOnReplicas(coreName);
-        } catch (Exception exc) {
-          // don't want leader election to fail because of
-          // an error trying to tell others to recover
+        
+        if (isLeader) {
+          // check for any replicas in my shard that were set to down by the previous leader
+          try {
+            startLeaderInitiatedRecoveryOnReplicas(coreName);
+          } catch (Exception exc) {
+            // don't want leader election to fail because of
+            // an error trying to tell others to recover
+          }
         }
+      } else {
+        cancelElection();
       }
     } finally {
       MDCLoggingContext.clear();

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Wed Sep  2 13:06:13 2015
@@ -22,10 +22,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.solr.cloud.ZkController.ContextKey;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -70,11 +72,21 @@ public  class LeaderElector {
 
   private ElectionWatcher watcher;
 
+  private Map<ContextKey,ElectionContext> electionContexts;
+  private ContextKey contextKey;
+
   public LeaderElector(SolrZkClient zkClient) {
     this.zkClient = zkClient;
     zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
   }
   
+  public LeaderElector(SolrZkClient zkClient, ContextKey key, Map<ContextKey,ElectionContext> electionContexts) {
+    this.zkClient = zkClient;
+    zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
+    this.electionContexts = electionContexts;
+    this.contextKey = key;
+  }
+
   public ElectionContext getContext() {
     return context;
   }
@@ -140,20 +152,6 @@ public  class LeaderElector {
         retryElection(context, false);//join at the tail again
         return;
       }
-      // first we delete the node advertising the old leader in case the ephem is still there
-      try {
-        zkClient.delete(context.leaderPath, -1, true);
-      }catch (KeeperException.NoNodeException nne){
-        //no problem
-      }catch (InterruptedException e){
-        throw e;
-      } catch (Exception e) {
-        //failed to delete the leader node
-        log.error("leader elect delete error",e);
-        retryElection(context, false);
-        return;
-        // fine
-      }
 
       try {
         runIamLeaderProcess(context, replacement);
@@ -280,7 +278,7 @@ public  class LeaderElector {
       try {
         if(joinAtHead){
           log.info("Node {} trying to join election at the head", id);
-          List<String> nodes = OverseerProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
+          List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
           if(nodes.size() <2){
             leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
                 CreateMode.EPHEMERAL_SEQUENTIAL, false);
@@ -423,6 +421,9 @@ public  class LeaderElector {
   void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
     ElectionWatcher watcher = this.watcher;
     ElectionContext ctx = context.copy();
+    if (electionContexts != null) {
+      electionContexts.put(contextKey, ctx);
+    }
     if (watcher != null) watcher.cancel();
     this.context.cancelElection();
     this.context = ctx;

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java Wed Sep  2 13:06:13 2015
@@ -136,7 +136,7 @@ public class Overseer implements Closeab
 
       log.info("Starting to work on the main queue");
       try {
-        ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+        ZkStateWriter zkStateWriter = null;
         ClusterState clusterState = null;
         boolean refreshClusterState = true; // let's refresh in the first iteration
         while (!this.isClosed) {
@@ -153,6 +153,7 @@ public class Overseer implements Closeab
             try {
               reader.updateClusterState();
               clusterState = reader.getClusterState();
+              zkStateWriter = new ZkStateWriter(reader, stats);
               refreshClusterState = false;
 
               // if there were any errors while processing
@@ -187,7 +188,7 @@ public class Overseer implements Closeab
             }
           }
 
-          DistributedQueue.QueueEvent head = null;
+          byte[] head = null;
           try {
             head = stateUpdateQueue.peek(true);
           } catch (KeeperException e) {
@@ -206,8 +207,8 @@ public class Overseer implements Closeab
           }
           try {
             while (head != null) {
-              final byte[] data = head.getBytes();
-              final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+              byte[] data = head;
+              final ZkNodeProps message = ZkNodeProps.load(data);
               log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
               // we can batch here because workQueue is our fallback in case a ZK write failed
               clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@@ -237,13 +238,16 @@ public class Overseer implements Closeab
             // clean work queue
             while (workQueue.poll() != null);
 
+          } catch (KeeperException.BadVersionException bve) {
+            log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state", bve);
+            refreshClusterState = true;
           } catch (KeeperException e) {
             if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
               log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
               return;
             }
             log.error("Exception in Overseer main queue loop", e);
-            refreshClusterState = true; // it might have been a bad version error
+            refreshClusterState = true; // force refresh state in case of all errors
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return;
@@ -305,7 +309,7 @@ public class Overseer implements Closeab
       try {
         Map m = (Map) Utils.fromJSON(data);
         String id = (String) m.get("id");
-        if(overseerCollectionProcessor.getId().equals(id)){
+        if(overseerCollectionConfigSetProcessor.getId().equals(id)){
           try {
             log.info("I'm exiting , but I'm still the leader");
             zkClient.delete(path,stat.getVersion(),true);
@@ -386,7 +390,7 @@ public class Overseer implements Closeab
           case QUIT:
             if (myId.equals(message.get("id"))) {
               log.info("Quit command received {}", LeaderElector.getNodeName(myId));
-              overseerCollectionProcessor.close();
+              overseerCollectionConfigSetProcessor.close();
               close();
             } else {
               log.warn("Overseer received wrong QUIT message {}", message);
@@ -782,7 +786,7 @@ public class Overseer implements Closeab
 
   private final String adminPath;
 
-  private OverseerCollectionProcessor overseerCollectionProcessor;
+  private OverseerCollectionConfigSetProcessor overseerCollectionConfigSetProcessor;
 
   private ZkController zkController;
 
@@ -820,8 +824,8 @@ public class Overseer implements Closeab
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
     OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, adminPath, shardHandler.getShardHandlerFactory());
-    overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
-    ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id);
+    overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
+    ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
     ccThread.setDaemon(true);
     
     ThreadGroup ohcfTg = new ThreadGroup("Overseer Hdfs SolrCore Failover Thread.");
@@ -918,15 +922,27 @@ public class Overseer implements Closeab
   }
   
   /* Collection creation queue */
-  static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
+  static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
     return getCollectionQueue(zkClient, new Stats());
   }
 
-  static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats)  {
+  static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats)  {
     createOverseerNode(zkClient);
-    return new DistributedQueue(zkClient, "/overseer/collection-queue-work", zkStats);
+    return new OverseerTaskQueue(zkClient, "/overseer/collection-queue-work", zkStats);
   }
-  
+
+  /* The queue for ConfigSet related operations */
+  static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient)  {
+    return getConfigSetQueue(zkClient, new Stats());
+  }
+
+  static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats)  {
+    // For now, we use the same queue as the collection queue, but ensure
+    // that the actions are prefixed with a unique string.
+    createOverseerNode(zkClient);
+    return getCollectionQueue(zkClient, zkStats);
+  }
+
   private static void createOverseerNode(final SolrZkClient zkClient) {
     try {
       zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Wed Sep  2 13:06:13 2015
@@ -97,7 +97,7 @@ import static org.apache.solr.common.clo
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
@@ -121,7 +121,10 @@ import static org.apache.solr.common.par
 import static org.apache.solr.common.util.StrUtils.formatString;
 import static org.apache.solr.common.util.Utils.makeMap;
 
-
+/**
+ * A {@link OverseerMessageHandler} that handles Collections API related
+ * overseer messages.
+ */
 public class OverseerCollectionMessageHandler implements OverseerMessageHandler {
 
   public static final String NUM_SLICES = "numShards";
@@ -203,7 +206,7 @@ public class OverseerCollectionMessageHa
   @Override
   @SuppressWarnings("unchecked")
   public SolrResponse processMessage(ZkNodeProps message, String operation) {
-    log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());
+    log.warn("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
 
     NamedList results = new NamedList();
     try {
@@ -306,7 +309,7 @@ public class OverseerCollectionMessageHa
   @SuppressWarnings("unchecked")
   private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
-        NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
+        CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
@@ -314,7 +317,7 @@ public class OverseerCollectionMessageHa
     params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
     params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
-    params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
+    params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
     params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
     params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
 
@@ -371,7 +374,7 @@ public class OverseerCollectionMessageHa
 
   @SuppressWarnings("unchecked")
   private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
-    String leaderNode = OverseerProcessor.getLeaderNode(zkStateReader.getZkClient());
+    String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
     results.add("leader", leaderNode);
     Stat stat = new Stat();
     zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
@@ -2006,7 +2009,7 @@ public class OverseerCollectionMessageHa
       boolean created = false;
       while (! waitUntil.hasTimedOut()) {
         Thread.sleep(100);
-        created = zkStateReader.getClusterState().getCollections().contains(collectionName);
+        created = zkStateReader.getClusterState().hasCollection(collectionName);
         if(created) break;
       }
       if (!created)
@@ -2470,7 +2473,7 @@ public class OverseerCollectionMessageHa
 
   @Override
   public String getName() {
-    return "Overseer Collection Processor";
+    return "Overseer Collection Message Handler";
   }
 
   @Override
@@ -2495,7 +2498,7 @@ public class OverseerCollectionMessageHa
   }
 
   @Override
-  public void unmarkExclusiveTask(String collectionName, String operation) {
+  public void unmarkExclusiveTask(String collectionName, String operation, ZkNodeProps message) {
     if(!CLUSTERSTATUS.isEqual(operation) && collectionName != null) {
       synchronized (collectionWip) {
         collectionWip.remove(collectionName);
@@ -2510,8 +2513,10 @@ public class OverseerCollectionMessageHa
     if(CLUSTERSTATUS.isEqual(message.getStr(Overseer.QUEUE_OPERATION)))
       return ExclusiveMarking.EXCLUSIVE;
 
-    if(collectionWip.contains(collectionName))
-      return ExclusiveMarking.NONEXCLUSIVE;
+    synchronized (collectionWip) {
+      if(collectionWip.contains(collectionName))
+        return ExclusiveMarking.NONEXCLUSIVE;
+    }
 
     return ExclusiveMarking.NOTDETERMINED;
   }

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java Wed Sep  2 13:06:13 2015
@@ -21,7 +21,7 @@ import org.apache.solr.client.solrj.Solr
 import org.apache.solr.common.cloud.ZkNodeProps;
 
 /**
- * Interface for processing messages received by an {@link OverseerProcessor}
+ * Interface for processing messages received by an {@link OverseerTaskProcessor}
  */
 public interface OverseerMessageHandler {
 
@@ -61,8 +61,9 @@ public interface OverseerMessageHandler
   /**
    * @param taskKey the key associated with the task
    * @param operation the operation being processed
+   * @param message the message being processed
    */
-  void unmarkExclusiveTask(String taskKey, String operation);
+  void unmarkExclusiveTask(String taskKey, String operation, ZkNodeProps message);
 
   /**
    * @param taskKey the key associated with the task

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java Wed Sep  2 13:06:13 2015
@@ -61,10 +61,10 @@ public class OverseerNodePrioritizer {
 
     List overseerDesignates = (List) m.get("overseer");
     if(overseerDesignates==null || overseerDesignates.isEmpty()) return;
-    String ldr = OverseerProcessor.getLeaderNode(zk);
+    String ldr = OverseerTaskProcessor.getLeaderNode(zk);
     if(overseerDesignates.contains(ldr)) return;
     log.info("prioritizing overseer nodes at {} overseer designates are {}", overseerId, overseerDesignates);
-    List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
+    List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
     if(electionNodes.size()<2) return;
     log.info("sorted nodes {}", electionNodes);
 
@@ -89,7 +89,7 @@ public class OverseerNodePrioritizer {
     //now ask the current leader to QUIT , so that the designate can takeover
     Overseer.getInQueue(zkStateReader.getZkClient()).offer(
         Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
-            "id", OverseerProcessor.getLeaderId(zkStateReader.getZkClient()))));
+            "id", OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()))));
 
   }
 

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Sep  2 13:06:13 2015
@@ -73,6 +73,7 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.URLUtil;
@@ -116,7 +117,8 @@ public final class ZkController {
   private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
 
   private final DistributedQueue overseerJobQueue;
-  private final DistributedQueue overseerCollectionQueue;
+  private final OverseerTaskQueue overseerCollectionQueue;
+  private final OverseerTaskQueue overseerConfigSetQueue;
 
   private final DistributedMap overseerRunningMap;
   private final DistributedMap overseerCompletedMap;
@@ -168,8 +170,6 @@ public final class ZkController {
   private final ZkCmdExecutor cmdExecutor;
   private final ZkStateReader zkStateReader;
 
-  private final LeaderElector leaderElector;
-
   private final String zkServerAddress;          // example: 127.0.0.1:54062/solr
 
   private final int localHostPort;      // example: 54065
@@ -370,17 +370,18 @@ public final class ZkController {
         } catch (Exception e) {
           log.error("Error trying to stop any Overseer threads", e);
         }
+        closeOutstandingElections(registerOnReconnect);
         markAllAsNotLeader(registerOnReconnect);
       }
     }, zkACLProvider);
 
     this.overseerJobQueue = Overseer.getInQueue(zkClient);
     this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
+    this.overseerConfigSetQueue = Overseer.getConfigSetQueue(zkClient);
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
     cmdExecutor = new ZkCmdExecutor(clientTimeout);
-    leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient, new Runnable() {
       @Override
       public void run() {
@@ -477,6 +478,32 @@ public final class ZkController {
       }
     }
   }
+  
+  private void closeOutstandingElections(final CurrentCoreDescriptorProvider registerOnReconnect) {
+    
+    List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+    if (descriptors != null) {
+      for (CoreDescriptor descriptor : descriptors) {
+        closeExistingElectionContext(descriptor);
+      }
+    }
+  }
+  
+  private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
+    // look for old context - if we find it, cancel it
+    String collection = cd.getCloudDescriptor().getCollectionName();
+    final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+    
+    ContextKey contextKey = new ContextKey(collection, coreNodeName);
+    ElectionContext prevContext = electionContexts.get(contextKey);
+    
+    if (prevContext != null) {
+      prevContext.close();
+      electionContexts.remove(contextKey);
+    }
+    
+    return contextKey;
+  }
 
   private void markAllAsNotLeader(
       final CurrentCoreDescriptorProvider registerOnReconnect) {
@@ -633,8 +660,9 @@ public final class ZkController {
     cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
-    cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
-    cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH,"{}".getBytes(StandardCharsets.UTF_8),CreateMode.PERSISTENT, zkClient);
+    byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
+    cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
+    cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
   }
 
   private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
@@ -660,7 +688,7 @@ public final class ZkController {
       if (!zkRunOnly) {
         overseerElector = new LeaderElector(zkClient);
         this.overseer = new Overseer(shardHandler, updateShardHandler,
-            CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
+            CommonParams.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
         ElectionContext context = new OverseerElectionContext(zkClient,
             overseer, getNodeName());
         overseerElector.setup(context);
@@ -1064,11 +1092,12 @@ public final class ZkController {
     props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
     props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
     props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
 
 
     ZkNodeProps ourProps = new ZkNodeProps(props);
 
-
+    LeaderElector leaderElector = new LeaderElector(zkClient, contextKey, electionContexts);
     ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
         collection, coreNodeName, ourProps, this, cc);
 
@@ -1767,10 +1796,14 @@ public final class ZkController {
     return overseerJobQueue;
   }
 
-  public DistributedQueue getOverseerCollectionQueue() {
+  public OverseerTaskQueue getOverseerCollectionQueue() {
     return overseerCollectionQueue;
   }
 
+  public OverseerTaskQueue getOverseerConfigSetQueue() {
+    return overseerConfigSetQueue;
+  }
+
   public DistributedMap getOverseerRunningMap() {
     return overseerRunningMap;
   }
@@ -1868,23 +1901,36 @@ public final class ZkController {
 
   public void rejoinShardLeaderElection(SolrParams params) {
     try {
+      
       String collectionName = params.get(COLLECTION_PROP);
       String shardId = params.get(SHARD_ID_PROP);
-      String nodeName = params.get(NODE_NAME_PROP);
+      String coreNodeName = params.get(CORE_NODE_NAME_PROP);
       String coreName = params.get(CORE_NAME_PROP);
       String electionNode = params.get(ELECTION_NODE_PROP);
       String baseUrl = params.get(BASE_URL_PROP);
 
-      ZkNodeProps zkProps = new ZkNodeProps(CORE_NAME_PROP, coreName, NODE_NAME_PROP, nodeName, COLLECTION_PROP, collectionName,
-          SHARD_ID_PROP, shardId, ELECTION_NODE_PROP, electionNode, BASE_URL_PROP, baseUrl);
-
-      ShardLeaderElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, collectionName,
-          nodeName, zkProps, this, getCoreContainer());
-      LeaderElector elect = new LeaderElector(this.zkClient);
-      context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
-      elect.setup(context);
-
-      elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+      try (SolrCore core = cc.getCore(coreName)) {
+        MDCLoggingContext.setCore(core);
+        
+        log.info("Rejoin the shard leader election.");
+        
+        ContextKey contextKey = new ContextKey(collectionName, coreNodeName);
+        
+        ElectionContext prevContext = electionContexts.get(contextKey);
+        if (prevContext != null) prevContext.cancelElection();
+        
+        ZkNodeProps zkProps = new ZkNodeProps(BASE_URL_PROP, baseUrl, CORE_NAME_PROP, coreName, NODE_NAME_PROP, getNodeName(), CORE_NODE_NAME_PROP, coreNodeName);
+            
+        LeaderElector elect = ((ShardLeaderElectionContextBase) prevContext).getLeaderElector();
+        ShardLeaderElectionContext context = new ShardLeaderElectionContext(elect, shardId, collectionName,
+            coreNodeName, zkProps, this, getCoreContainer());
+            
+        context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
+        elect.setup(context);
+        electionContexts.put(contextKey, context);
+        
+        elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+      }
     } catch (Exception e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
     }

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java Wed Sep  2 13:06:13 2015
@@ -36,8 +36,28 @@ import org.slf4j.LoggerFactory;
 
 import static java.util.Collections.singletonMap;
 
+/**
+ * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for
+ * both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections
+ * each of which get their own individual state.json in ZK.
+ *
+ * Updates to the cluster state are specified using the
+ * {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates
+ * to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
+ * automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
+ *
+ * If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()}
+ * throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
+ * class is suspect and the current instance of the class should be discarded and a new instance should be created
+ * and used for any future updates.
+ */
 public class ZkStateWriter {
+  private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
   private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class);
+
+  /**
+   * Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
+   */
   public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
 
   protected final ZkStateReader reader;
@@ -52,6 +72,12 @@ public class ZkStateWriter {
   protected int lastStateFormat = -1; // sentinel value
   protected String lastCollectionName = null;
 
+  /**
+   * Set to true if we ever get a BadVersionException so that we can disallow future operations
+   * with this instance
+   */
+  protected boolean invalidState = false;
+
   public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) {
     assert zkStateReader != null;
 
@@ -59,7 +85,32 @@ public class ZkStateWriter {
     this.stats = stats;
   }
 
-  public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception {
+  /**
+   * Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified
+   * {@link ClusterState} is returned and it is expected that the caller will use the returned
+   * cluster state for the subsequent invocation of this method.
+   * <p>
+   * The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering
+   * logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the
+   * last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates()} can
+   * be used to force an immediate flush of pending cluster state changes.
+   *
+   * @param prevState the cluster state information on which the given <code>cmd</code> is applied
+   * @param cmd       the {@link ZkWriteCommand} which specifies the change to be applied to cluster state
+   * @param callback  a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
+   *                  for any callbacks
+   * @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If
+   * <code>cmd</code> is a no-op ({@link #NO_OP}) then the <code>prevState</code> is returned unmodified.
+   * @throws IllegalStateException if the current instance is no longer usable. The current instance must be
+   *                               discarded.
+   * @throws Exception             on an error in ZK operations or callback. If a flush to ZooKeeper results
+   *                               in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
+   *                               must be discarded
+   */
+  public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception {
+    if (invalidState) {
+      throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+    }
     if (cmd == NO_OP) return prevState;
 
     if (maybeFlushBefore(cmd)) {
@@ -74,14 +125,24 @@ public class ZkStateWriter {
       callback.onEnqueue();
     }
 
+    /*
+    We need to know if the collection has moved from stateFormat=1 to stateFormat=2 (as a result of MIGRATECLUSTERSTATE)
+     */
+    DocCollection previousCollection = prevState.getCollectionOrNull(cmd.name);
+    boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
+    boolean isCurrentlyStateFormat1 = cmd.collection != null && cmd.collection.getStateFormat() == 1;
+
     if (cmd.collection == null) {
-      isClusterStateModified = true;
+      if (wasPreviouslyStateFormat1) {
+        isClusterStateModified = true;
+      }
       clusterState = prevState.copyWith(cmd.name, null);
       updates.put(cmd.name, null);
     } else {
-      if (cmd.collection.getStateFormat() > 1) {
+      if (!isCurrentlyStateFormat1) {
         updates.put(cmd.name, cmd.collection);
-      } else {
+      }
+      if (isCurrentlyStateFormat1 || wasPreviouslyStateFormat1) {
         isClusterStateModified = true;
       }
       clusterState = prevState.copyWith(cmd.name, cmd.collection);
@@ -129,14 +190,25 @@ public class ZkStateWriter {
       return false;
     lastCollectionName = cmd.name;
     lastStateFormat = cmd.collection.getStateFormat();
-    return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+    return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL;
   }
 
   public boolean hasPendingUpdates() {
     return !updates.isEmpty() || isClusterStateModified;
   }
 
-  public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+  /**
+   * Writes all pending updates to ZooKeeper and returns the modified cluster state
+   *
+   * @return the modified cluster state
+   * @throws IllegalStateException if the current instance is no longer usable and must be discarded
+   * @throws KeeperException       if any ZooKeeper operation results in an error
+   * @throws InterruptedException  if the current thread is interrupted
+   */
+  public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+    if (invalidState) {
+      throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+    }
     if (!hasPendingUpdates()) return clusterState;
     TimerContext timerContext = stats.time("update_state");
     boolean success = false;
@@ -149,6 +221,7 @@ public class ZkStateWriter {
 
           if (c == null) {
             // let's clean up the collections path for this collection
+            log.info("going to delete_collection {}", path);
             reader.getZkClient().clean("/collections/" + name);
           } else if (c.getStateFormat() > 1) {
             byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
@@ -163,7 +236,6 @@ public class ZkStateWriter {
               reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
               DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
               clusterState = clusterState.copyWith(name, newCollection);
-              isClusterStateModified = true;
             }
           } else if (c.getStateFormat() == 1) {
             isClusterStateModified = true;
@@ -175,7 +247,6 @@ public class ZkStateWriter {
 
       if (isClusterStateModified) {
         assert clusterState.getZkClusterStateVersion() >= 0;
-        lastUpdatedTime = System.nanoTime();
         byte[] data = Utils.toJSON(clusterState);
         Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
         Set<String> collectionNames = clusterState.getCollections();
@@ -187,7 +258,12 @@ public class ZkStateWriter {
         clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collectionStates);
         isClusterStateModified = false;
       }
+      lastUpdatedTime = System.nanoTime();
       success = true;
+    } catch (KeeperException.BadVersionException bve) {
+      // this is a tragic error, we must disallow usage of this instance
+      invalidState = true;
+      throw bve;
     } finally {
       timerContext.stop();
       if (success) {

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java Wed Sep  2 13:06:13 2015
@@ -21,16 +21,14 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 
-import org.apache.http.client.methods.HttpGet;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.GenericSolrRequest;
 import org.apache.solr.client.solrj.response.SimpleSolrResponse;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -89,7 +87,7 @@ public class SnitchContext implements Re
     //todo batch all requests to the same server
 
     try {
-      SimpleSolrResponse rsp = invoke(snitchInfo.getCoreContainer().getUpdateShardHandler(), url, CoreContainer.CORES_HANDLER_PATH, params);
+      SimpleSolrResponse rsp = invoke(snitchInfo.getCoreContainer().getUpdateShardHandler(), url, CommonParams.CORES_HANDLER_PATH, params);
       Map<String, Object> returnedVal = (Map<String, Object>) rsp.getResponse().get(klas);
       if(exception == null){
 //        log this

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java Wed Sep  2 13:06:13 2015
@@ -36,6 +36,9 @@ public class ConfigSetProperties {
 
   private static final Logger log = LoggerFactory.getLogger(ConfigSetProperties.class);
 
+  public static final String DEFAULT_FILENAME = "configsetprops.json";
+  public static final String IMMUTABLE_CONFIGSET_ARG = "immutable";
+
   /**
    * Return the properties associated with the ConfigSet (e.g. immutable)
    *
@@ -55,9 +58,18 @@ public class ConfigSetProperties {
     }
 
     try {
+      return readFromInputStream(reader);
+    } finally {
+      IOUtils.closeQuietly(reader);
+    }
+  }
+
+  public static NamedList readFromInputStream(InputStreamReader reader) {
+    try {
       JSONParser jsonParser = new JSONParser(reader);
       Object object = ObjectBuilder.getVal(jsonParser);
       if (!(object instanceof Map)) {
+        final String objectClass = object == null ? "null" : object.getClass().getName();
         throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid JSON type " + object.getClass().getName() + ", expected Map");
       }
       return new NamedList((Map)object);

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java Wed Sep  2 13:06:13 2015
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -46,6 +45,7 @@ import org.apache.solr.common.util.IOUti
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.solr.handler.admin.CoreAdminHandler;
 import org.apache.solr.handler.admin.InfoHandler;
 import org.apache.solr.handler.admin.SecurityConfHandler;
@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Collections.EMPTY_MAP;
+import static org.apache.solr.common.params.CommonParams.*;
 import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
 
 
@@ -97,6 +98,7 @@ public class CoreContainer {
   protected CoreAdminHandler coreAdminHandler = null;
   protected CollectionsHandler collectionsHandler = null;
   private InfoHandler infoHandler;
+  protected ConfigSetsHandler configSetsHandler = null;
 
   private PKIAuthenticationPlugin pkiAuthenticationPlugin;
 
@@ -126,10 +128,6 @@ public class CoreContainer {
 
   private final JarRepository jarRepository = new JarRepository(this);
 
-  public static final String CORES_HANDLER_PATH = "/admin/cores";
-  public static final String COLLECTIONS_HANDLER_PATH = "/admin/collections";
-  public static final String INFO_HANDLER_PATH = "/admin/info";
-
   private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
 
   private boolean asyncSolrCoreLoad;
@@ -407,8 +405,10 @@ public class CoreContainer {
     containerHandlers.put(INFO_HANDLER_PATH, infoHandler);
     coreAdminHandler   = createHandler(cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
     containerHandlers.put(CORES_HANDLER_PATH, coreAdminHandler);
-    containerHandlers.put("/admin/authorization", securityConfHandler);
-    containerHandlers.put("/admin/authentication", securityConfHandler);
+    configSetsHandler = createHandler(cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
+    containerHandlers.put(CONFIGSETS_HANDLER_PATH, configSetsHandler);
+    containerHandlers.put(AUTHZ_PATH, securityConfHandler);
+    containerHandlers.put(AUTHC_PATH, securityConfHandler);
     if(pkiAuthenticationPlugin != null)
       containerHandlers.put(PKIAuthenticationPlugin.PATH, pkiAuthenticationPlugin.getRequestHandler());
 
@@ -481,7 +481,7 @@ public class CoreContainer {
                 }
               }
             } finally {
-              ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
+              ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
             }
           }
         };
@@ -1040,6 +1040,10 @@ public class CoreContainer {
     return infoHandler;
   }
 
+  public ConfigSetsHandler getConfigSetsHandler() {
+    return configSetsHandler;
+  }
+
   public String getHostName() {
     return this.hostName;
   }

Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java Wed Sep  2 13:06:13 2015
@@ -84,7 +84,7 @@ public class CoreDescriptor {
   private static ImmutableMap<String, String> defaultProperties = new ImmutableMap.Builder<String, String>()
       .put(CORE_CONFIG, "solrconfig.xml")
       .put(CORE_SCHEMA, "schema.xml")
-      .put(CORE_CONFIGSET_PROPERTIES, "configsetprops.json")
+      .put(CORE_CONFIGSET_PROPERTIES, ConfigSetProperties.DEFAULT_FILENAME)
       .put(CORE_DATADIR, "data" + File.separator)
       .put(CORE_TRANSIENT, "false")
       .put(CORE_LOADONSTARTUP, "true")