You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2015/09/02 15:06:22 UTC
svn commit: r1700800 [10/24] - in /lucene/dev/branches/lucene6699: ./
dev-tools/ dev-tools/eclipse/ dev-tools/idea/.idea/ dev-tools/scripts/
lucene/ lucene/analysis/ lucene/analysis/common/
lucene/analysis/common/src/java/org/apache/lucene/analysis/ar/...
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Wed Sep 2 13:06:13 2015
@@ -18,19 +18,21 @@
package org.apache.solr.cloud;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -40,23 +42,60 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A distributed queue from zk recipes.
+ * A distributed queue.
*/
public class DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
-
- private static long DEFAULT_TIMEOUT = 5*60*1000;
-
- private final String dir;
-
- private SolrZkClient zookeeper;
-
- private final String prefix = "qn-";
-
- private final String response_prefix = "qnr-" ;
- private final Overseer.Stats stats;
-
+ static final String PREFIX = "qn-";
+
+ /**
+ * Theory of operation:
+ * <p>
+ * Under ordinary circumstances we neither watch nor poll for children in ZK.
+ * Instead we keep an in-memory list of known child names. When the in-memory
+ * list is exhausted, we then fetch from ZK.
+ * <p>
+ * We only bother setting a child watcher when the queue has no children in ZK.
+ */
+ private static final Object _IMPLEMENTATION_NOTES = null;
+
+ final String dir;
+
+ final SolrZkClient zookeeper;
+
+ final Overseer.Stats stats;
+
+ /**
+ * A lock that guards all of the mutable state that follows.
+ */
+ private final ReentrantLock updateLock = new ReentrantLock();
+
+ /**
+ * Contains the last set of children fetched from ZK. Elements are removed from the head of
+ * this in-memory set as they are consumed from the queue. Due to the distributed nature
+ * of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
+ * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
+ * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+ */
+ private TreeSet<String> knownChildren = new TreeSet<>();
+
+ /**
+ * Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
+ * {@link #knownChildren} is empty before waiting on this condition.
+ */
+ private final Condition notEmpty = updateLock.newCondition();
+
+ /**
+ * If non-null, the last watcher to listen for child changes.
+ */
+ private ChildWatcher lastWatcher = null;
+
+ /**
+ * If true, ZK's child list probably doesn't match what's in memory.
+ */
+ private boolean isDirty = true;
+
public DistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
@@ -77,554 +116,312 @@ public class DistributedQueue {
this.zookeeper = zookeeper;
this.stats = stats;
}
-
- /**
- * Returns a Map of the children, ordered by id.
- *
- * @param watcher
- * optional watcher on getChildren() operation.
- * @return map from id to child name for all children
- */
- private TreeMap<Long,String> orderedChildren(Watcher watcher)
- throws KeeperException, InterruptedException {
- TreeMap<Long,String> orderedChildren = new TreeMap<>();
-
- List<String> childNames = zookeeper.getChildren(dir, watcher, true);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- try {
- // Check format
- if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
- LOG.debug("Found child node with improper name: " + childName);
- continue;
- }
- String suffix = childName.substring(prefix.length());
- Long childId = new Long(suffix);
- orderedChildren.put(childId, childName);
- } catch (NumberFormatException e) {
- LOG.warn("Found child node with improper format : " + childName + " "
- + e, e);
- }
- }
-
- return orderedChildren;
- }
-
/**
- * Returns true if the queue contains a task with the specified async id.
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty.
+ *
+ * @return data at the first element of the queue, or null.
*/
- public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
- throws KeeperException, InterruptedException {
-
- List<String> childNames = zookeeper.getChildren(dir, null, true);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- if (childName != null) {
- try {
- byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
- if (data != null) {
- ZkNodeProps message = ZkNodeProps.load(data);
- if (message.containsKey(requestIdKey)) {
- LOG.debug(">>>> {}", message.get(requestIdKey));
- if(message.get(requestIdKey).equals(requestId)) return true;
- }
- }
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- }
+ public byte[] peek() throws KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_peek");
+ try {
+ return firstElement();
+ } finally {
+ time.stop();
}
-
- return false;
}
-
/**
- * Return the head of the queue without modifying the queue.
- *
- * @return the data at the head of the queue.
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty and block is false.
+ *
+ * @param block if true, blocks until an element enters the queue
+ * @return data at the first element of the queue, or null.
*/
- private QueueEvent element() throws KeeperException,
- InterruptedException {
- TreeMap<Long,String> orderedChildren;
-
- // element, take, and remove follow the same pattern.
- // We want to return the child node with the smallest sequence number.
- // Since other clients are remove()ing and take()ing nodes concurrently,
- // the child with the smallest sequence number in orderedChildren might be
- // gone by the time we check.
- // We don't call getChildren again until we have tried the rest of the nodes
- // in sequence order.
- while (true) {
- try {
- orderedChildren = orderedChildren(null);
- } catch (KeeperException.NoNodeException e) {
- return null;
- }
- if (orderedChildren.size() == 0) return null;
-
- for (String headNode : orderedChildren.values()) {
- if (headNode != null) {
- try {
- return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- }
- }
- }
+ public byte[] peek(boolean block) throws KeeperException, InterruptedException {
+ return block ? peek(Long.MAX_VALUE) : peek();
}
-
+
/**
- * Attempts to remove the head of the queue and return it.
- *
- * @return The former head of the queue
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty after wait ms.
+ *
+ * @param wait max wait time in ms.
+ * @return data at the first element of the queue, or null.
*/
- public byte[] remove() throws NoSuchElementException, KeeperException,
- InterruptedException {
- TreeMap<Long,String> orderedChildren;
- // Same as for element. Should refactor this.
- TimerContext time = stats.time(dir + "_remove");
+ public byte[] peek(long wait) throws KeeperException, InterruptedException {
+ Preconditions.checkArgument(wait > 0);
+ TimerContext time;
+ if (wait == Long.MAX_VALUE) {
+ time = stats.time(dir + "_peek_wait_forever");
+ } else {
+ time = stats.time(dir + "_peek_wait" + wait);
+ }
+ updateLock.lockInterruptibly();
try {
- while (true) {
- try {
- orderedChildren = orderedChildren(null);
- } catch (KeeperException.NoNodeException e) {
- throw new NoSuchElementException();
- }
- if (orderedChildren.size() == 0) throw new NoSuchElementException();
-
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
- try {
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
- }
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+ while (waitNanos > 0) {
+ byte[] result = firstElement();
+ if (result != null) {
+ return result;
}
-
+ waitNanos = notEmpty.awaitNanos(waitNanos);
}
+ return null;
} finally {
+ updateLock.unlock();
time.stop();
}
}
-
+
/**
- * Remove the event and save the response into the other path.
- *
+ * Attempts to remove the head of the queue and return it. Returns null if the
+ * queue is empty.
+ *
+ * @return Head of the queue or null.
*/
- public byte[] remove(QueueEvent event) throws KeeperException,
- InterruptedException {
- TimerContext time = stats.time(dir + "_remove_event");
+ public byte[] poll() throws KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_poll");
try {
- String path = event.getId();
- String responsePath = dir + "/" + response_prefix
- + path.substring(path.lastIndexOf("-") + 1);
- if (zookeeper.exists(responsePath, true)) {
- zookeeper.setData(responsePath, event.getBytes(), true);
- }
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
+ return removeFirst();
} finally {
time.stop();
}
}
/**
- * Watcher that blocks until a WatchedEvent occurs for a znode.
+ * Attempts to remove the head of the queue and return it.
+ *
+ * @return The former head of the queue
*/
- private final class LatchWatcher implements Watcher {
-
- private final Object lock;
- private WatchedEvent event;
- private Event.EventType latchEventType;
-
- LatchWatcher(Object lock) {
- this(lock, null);
- }
-
- LatchWatcher(Event.EventType eventType) {
- this(new Object(), eventType);
- }
-
- LatchWatcher(Object lock, Event.EventType eventType) {
- this.lock = lock;
- this.latchEventType = eventType;
- }
-
- @Override
- public void process(WatchedEvent event) {
- Event.EventType eventType = event.getType();
- // None events are ignored
- // If latchEventType is not null, only fire if the type matches
- if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
- LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
- synchronized (lock) {
- this.event = event;
- lock.notifyAll();
- }
- }
- }
-
- public void await(long timeout) throws InterruptedException {
- synchronized (lock) {
- if (this.event != null) return;
- lock.wait(timeout);
- }
- }
-
- public WatchedEvent getWatchedEvent() {
- return event;
- }
- }
-
- // we avoid creating *many* watches in some cases
- // by saving the childrenWatcher and the children associated - see SOLR-6336
- private LatchWatcher childrenWatcher;
- private TreeMap<Long,String> fetchedChildren;
- private final Object childrenWatcherLock = new Object();
-
- private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
- {
- LatchWatcher watcher;
- TreeMap<Long,String> children;
- synchronized (childrenWatcherLock) {
- watcher = childrenWatcher;
- children = fetchedChildren;
- }
-
- if (watcher == null || watcher.getWatchedEvent() != null) {
- // this watcher is only interested in child change events
- watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
- while (true) {
- try {
- children = orderedChildren(watcher);
- break;
- } catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
- // go back to the loop and try again
- }
- }
- synchronized (childrenWatcherLock) {
- childrenWatcher = watcher;
- fetchedChildren = children;
- }
- }
-
- while (true) {
- if (!children.isEmpty()) break;
- watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
- if (watcher.getWatchedEvent() != null) {
- children = orderedChildren(null);
+ public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_remove");
+ try {
+ byte[] result = removeFirst();
+ if (result == null) {
+ throw new NoSuchElementException();
}
- if (wait != Long.MAX_VALUE) break;
+ return result;
+ } finally {
+ time.stop();
}
- return Collections.unmodifiableMap(children);
}
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
- *
+ *
* @return The former head of the queue
*/
public byte[] take() throws KeeperException, InterruptedException {
// Same as for element. Should refactor this.
TimerContext timer = stats.time(dir + "_take");
+ updateLock.lockInterruptibly();
try {
- Map<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
- try {
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
+ while (true) {
+ byte[] result = removeFirst();
+ if (result != null) {
+ return result;
}
+ notEmpty.await();
}
- return null; // shouldn't really reach here..
} finally {
+ updateLock.unlock();
timer.stop();
}
}
-
+
/**
- * Inserts data into queue.
- *
- * @return true if data was successfully added
+ * Inserts data into queue. Successfully calling this method does NOT guarantee
+ * that the element will be immediately available in the in-memory queue. In particular,
+ * calling this method on an empty queue will not necessarily cause {@link #poll()} to
+ * return the offered element. Use a blocking method if you must wait for the offered
+ * element to become visible.
*/
- public boolean offer(byte[] data) throws KeeperException,
- InterruptedException {
+ public void offer(byte[] data) throws KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_offer");
try {
- return createData(dir + "/" + prefix, data,
- CreateMode.PERSISTENT_SEQUENTIAL) != null;
- } finally {
- time.stop();
- }
- }
-
- /**
- * Inserts data into zookeeper.
- *
- * @return true if data was successfully added
- */
- private String createData(String path, byte[] data, CreateMode mode)
- throws KeeperException, InterruptedException {
- for (;;) {
- try {
- return zookeeper.create(path, data, mode, true);
- } catch (KeeperException.NoNodeException e) {
+ while (true) {
try {
- zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
- } catch (KeeperException.NodeExistsException ne) {
- // someone created it
+ // We don't need to explicitly set isDirty here; if there is a watcher, it will
+ // see the update and set the bit itself; if there is no watcher we can defer
+ // the update anyway.
+ zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+ return;
+ } catch (KeeperException.NoNodeException e) {
+ try {
+ zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException ne) {
+ // someone created it
+ }
}
}
+ } finally {
+ time.stop();
}
}
-
+
+ public Overseer.Stats getStats() {
+ return stats;
+ }
+
/**
- * Offer the data and wait for the response
- *
+ * Returns the name if the first known child node, or {@code null} if the queue is empty.
+ * This is the only place {@link #knownChildren} is ever updated!
+ * The caller must double check that the actual node still exists, since the in-memory
+ * list is inherently stale.
*/
- public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
- InterruptedException {
- TimerContext time = stats.time(dir + "_offer");
+ private String firstChild(boolean remove) throws KeeperException, InterruptedException {
+ updateLock.lockInterruptibly();
try {
- String path = createData(dir + "/" + prefix, data,
- CreateMode.PERSISTENT_SEQUENTIAL);
- String watchID = createData(
- dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
- null, CreateMode.EPHEMERAL);
-
- Object lock = new Object();
- LatchWatcher watcher = new LatchWatcher(lock);
- synchronized (lock) {
- if (zookeeper.exists(watchID, watcher, true) != null) {
- watcher.await(timeout);
- }
+ // Try to fetch the first in-memory child.
+ if (!knownChildren.isEmpty()) {
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
}
- byte[] bytes = zookeeper.getData(watchID, null, null, true);
- zookeeper.delete(watchID, -1, true);
- return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
+
+ if (lastWatcher != null && !isDirty) {
+ // No children, no known updates, and a watcher is already set; nothing we can do.
+ return null;
+ }
+
+ // Try to fetch an updated list of children from ZK.
+ ChildWatcher newWatcher = new ChildWatcher();
+ knownChildren = fetchZkChildren(newWatcher);
+ lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
+ isDirty = false;
+ if (knownChildren.isEmpty()) {
+ return null;
+ }
+ notEmpty.signalAll();
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
} finally {
- time.stop();
+ updateLock.unlock();
}
}
-
+
/**
- * Returns the data at the first element of the queue, or null if the queue is
- * empty.
- *
- * @return data at the first element of the queue, or null.
+ * Return the current set of children from ZK; does not change internal state.
*/
- public byte[] peek() throws KeeperException, InterruptedException {
- TimerContext time = stats.time(dir + "_peek");
- try {
- QueueEvent element = element();
- if (element == null) return null;
- return element.getBytes();
- } finally {
- time.stop();
- }
- }
-
- public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, Long wait)
- throws KeeperException, InterruptedException {
- ArrayList<QueueEvent> topN = new ArrayList<>();
-
- LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
- TimerContext time = null;
- if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
- else time = stats.time(dir + "_peekTopN_wait" + wait);
+ TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
+ while (true) {
+ try {
+ TreeSet<String> orderedChildren = new TreeSet<>();
- try {
- Map<Long, String> orderedChildren = getChildren(wait);
- for (String headNode : orderedChildren.values()) {
- if (headNode != null && topN.size() < n) {
- try {
- String id = dir + "/" + headNode;
- if (excludeSet != null && excludeSet.contains(id)) continue;
- QueueEvent queueEvent = new QueueEvent(id,
- zookeeper.getData(dir + "/" + headNode, null, null, true), null);
- topN.add(queueEvent);
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- } else {
- if (topN.size() >= 1) {
- printQueueEventsListElementIds(topN);
- return topN;
+ List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+ stats.setQueueLength(childNames.size());
+ for (String childName : childNames) {
+ // Check format
+ if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+ LOG.debug("Found child node with improper name: " + childName);
+ continue;
}
+ orderedChildren.add(childName);
}
+ return orderedChildren;
+ } catch (KeeperException.NoNodeException e) {
+ zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+ // go back to the loop and try again
}
-
- if (topN.size() > 0 ) {
- printQueueEventsListElementIds(topN);
- return topN;
- }
- return null;
- } finally {
- time.stop();
}
}
- private void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
- if(LOG.isDebugEnabled()) {
- StringBuffer sb = new StringBuffer("[");
- for(QueueEvent queueEvent: topN) {
- sb.append(queueEvent.getId()).append(", ");
+ /**
+ * Return the currently-known set of children from memory. If there are no children,
+ * waits up to {@code waitMillis} for at least one child to become available. May
+ * update the set of known children.
+ */
+ SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+ while (waitNanos > 0) {
+ // Trigger a fetch if needed.
+ firstElement();
+ updateLock.lockInterruptibly();
+ try {
+ if (!knownChildren.isEmpty()) {
+ return new TreeSet<>(knownChildren);
+ }
+ waitNanos = notEmpty.awaitNanos(waitNanos);
+ } finally {
+ updateLock.unlock();
}
- sb.append("]");
- LOG.debug("Returning topN elements: {}", sb.toString());
}
+ return Collections.emptySortedSet();
}
-
/**
+ * Return the head of the queue without modifying the queue.
*
- * Gets last element of the Queue without removing it.
+ * @return the data at the head of the queue.
*/
- public String getTailId() throws KeeperException, InterruptedException {
- TreeMap<Long, String> orderedChildren = null;
- orderedChildren = orderedChildren(null);
- if(orderedChildren == null || orderedChildren.isEmpty()) return null;
-
- for(String headNode : orderedChildren.descendingMap().values())
- if (headNode != null) {
+ private byte[] firstElement() throws KeeperException, InterruptedException {
+ while (true) {
+ String firstChild = firstChild(false);
+ if (firstChild == null) {
+ return null;
+ }
+ try {
+ return zookeeper.getData(dir + "/" + firstChild, null, null, true);
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first, remove the in-memory and retry.
+ updateLock.lockInterruptibly();
try {
- QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
- null, null, true), null);
- return queueEvent.getId();
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
+ knownChildren.remove(firstChild);
+ } finally {
+ updateLock.unlock();
}
}
- return null;
- }
-
- public static class QueueEvent {
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((id == null) ? 0 : id.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- QueueEvent other = (QueueEvent) obj;
- if (id == null) {
- if (other.id != null) return false;
- } else if (!id.equals(other.id)) return false;
- return true;
- }
-
- private WatchedEvent event = null;
- private String id;
- private byte[] bytes;
-
- QueueEvent(String id, byte[] bytes, WatchedEvent event) {
- this.id = id;
- this.bytes = bytes;
- this.event = event;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getId() {
- return id;
- }
-
- public void setBytes(byte[] bytes) {
- this.bytes = bytes;
- }
-
- public byte[] getBytes() {
- return bytes;
- }
-
- public WatchedEvent getWatchedEvent() {
- return event;
}
-
- }
-
- /**
- * Returns the data at the first element of the queue, or null if the queue is
- * empty and block is false.
- *
- * @param block if true, blocks until an element enters the queue
- * @return data at the first element of the queue, or null.
- */
- public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
- return peek(block ? Long.MAX_VALUE : 0);
}
- /**
- * Returns the data at the first element of the queue, or null if the queue is
- * empty after wait ms.
- *
- * @param wait max wait time in ms.
- * @return data at the first element of the queue, or null.
- */
- public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
- TimerContext time = null;
- if (wait == Long.MAX_VALUE) {
- time = stats.time(dir + "_peek_wait_forever");
- } else {
- time = stats.time(dir + "_peek_wait" + wait);
- }
- try {
- if (wait == 0) {
- return element();
+ private byte[] removeFirst() throws KeeperException, InterruptedException {
+ while (true) {
+ String firstChild = firstChild(true);
+ if (firstChild == null) {
+ return null;
}
-
- Map<Long, String> orderedChildren = getChildren(wait);
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
+ try {
+ String path = dir + "/" + firstChild;
+ byte[] result = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ return result;
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first, remove the in-memory and retry.
+ updateLock.lockInterruptibly();
try {
- byte[] data = zookeeper.getData(path, null, null, true);
- return new QueueEvent(path, data, null);
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
+ knownChildren.remove(firstChild);
+ } finally {
+ updateLock.unlock();
}
}
- return null;
- } finally {
- time.stop();
}
}
-
- /**
- * Attempts to remove the head of the queue and return it. Returns null if the
- * queue is empty.
- *
- * @return Head of the queue or null.
- */
- public byte[] poll() throws KeeperException, InterruptedException {
- TimerContext time = stats.time(dir + "_poll");
+
+ @VisibleForTesting boolean hasWatcher() throws InterruptedException {
+ updateLock.lockInterruptibly();
try {
- return remove();
- } catch (NoSuchElementException e) {
- return null;
+ return lastWatcher != null;
} finally {
- time.stop();
+ updateLock.unlock();
}
}
- public Overseer.Stats getStats() {
- return stats;
+ private class ChildWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ updateLock.lock();
+ try {
+ // this watcher is automatically cleared when fired
+ if (lastWatcher == this) {
+ lastWatcher = null;
+ }
+ // Do no updates in this thread, just signal state back to client threads.
+ isDirty = true;
+ // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
+ notEmpty.signalAll();
+ } finally {
+ updateLock.unlock();
+ }
+ }
}
}
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Sep 2 13:06:13 2015
@@ -2,10 +2,12 @@ package org.apache.solr.cloud;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.Path;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
@@ -30,6 +32,11 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.OpResult.SetDataResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,16 +80,16 @@ public abstract class ElectionContext im
}
public void cancelElection() throws InterruptedException, KeeperException {
- if( leaderSeqPath != null ){
+ if (leaderSeqPath != null) {
try {
- log.info("canceling election {}",leaderSeqPath );
+ log.info("Canceling election {}", leaderSeqPath);
zkClient.delete(leaderSeqPath, -1, true);
} catch (NoNodeException e) {
// fine
- log.warn("cancelElection did not find election node to remove {}" ,leaderSeqPath);
+ log.info("cancelElection did not find election node to remove {}", leaderSeqPath);
}
} else {
- log.warn("cancelElection skipped as this context has not been initialized");
+ log.info("cancelElection skipped as this context has not been initialized");
}
}
@@ -104,6 +111,7 @@ class ShardLeaderElectionContextBase ext
protected String shardId;
protected String collection;
protected LeaderElector leaderElector;
+ protected volatile Integer leaderZkNodeParentVersion;
public ShardLeaderElectionContextBase(LeaderElector leaderElector,
final String shardId, final String collection, final String coreNodeName,
@@ -129,25 +137,81 @@ class ShardLeaderElectionContextBase ext
}
@Override
+ public void cancelElection() throws InterruptedException, KeeperException {
+ if (leaderZkNodeParentVersion != null) {
+ try {
+ // We need to be careful and make sure we *only* delete our own leader registration node.
+ // We do this by using a multi and ensuring the parent znode of the leader registration node
+ // matches the version we expect - there is a setData call that increments the parent's znode
+ // version whenever a leader registers.
+ log.info("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion);
+ List<Op> ops = new ArrayList<>(2);
+ ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+ ops.add(Op.delete(leaderPath, -1));
+ zkClient.multi(ops, true);
+ } catch (KeeperException.NoNodeException nne) {
+ // no problem
+ log.info("No leader registration node found to remove: {}", leaderPath);
+ } catch (KeeperException.BadVersionException bve) {
+ log.info("Cannot remove leader registration node because the current registered node is not ours: {}", leaderPath);
+ // no problem
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ SolrException.log(log, e);
+ }
+ leaderZkNodeParentVersion = null;
+ } else {
+ log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
+ }
+ super.cancelElection();
+ }
+
+ @Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
- // register as leader - if an ephemeral is already there, wait just a bit
- // to see if it goes away
+ // register as leader - if an ephemeral is already there, wait to see if it goes away
+ String parent = new Path(leaderPath).getParent().toString();
+ ZkCmdExecutor zcmd = new ZkCmdExecutor(30000);
+ zcmd.ensureExists(parent, zkClient);
+
try {
- RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
- new RetryCmd() {
- @Override
- public void execute() throws Throwable {
- zkClient.makePath(leaderPath, Utils.toJSON(leaderProps), CreateMode.EPHEMERAL, true);
+ RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, new RetryCmd() {
+
+ @Override
+ public void execute() throws InterruptedException, KeeperException {
+ log.info("Creating leader registration node", leaderPath);
+ List<Op> ops = new ArrayList<>(2);
+
+ // We use a multi operation to get the parent nodes version, which will
+ // be used to make sure we only remove our own leader registration node.
+ // The setData call used to get the parent version is also the trigger to
+ // increment the version. We also do a sanity check that our leaderSeqPath exists.
+
+ ops.add(Op.check(leaderSeqPath, -1));
+ ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL));
+ ops.add(Op.setData(parent, null, -1));
+ List<OpResult> results;
+
+ results = zkClient.multi(ops, true);
+
+ for (OpResult result : results) {
+ if (result.getType() == ZooDefs.OpCode.setData) {
+ SetDataResult dresult = (SetDataResult) result;
+ Stat stat = dresult.getStat();
+ leaderZkNodeParentVersion = stat.getVersion();
+ return;
}
}
- );
+ assert leaderZkNodeParentVersion != null;
+ }
+ });
} catch (Throwable t) {
if (t instanceof OutOfMemoryError) {
throw (OutOfMemoryError) t;
}
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
- }
+ }
assert shardId != null;
ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
@@ -158,6 +222,10 @@ class ShardLeaderElectionContextBase ext
leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
Overseer.getInQueue(zkClient).offer(Utils.toJSON(m));
+ }
+
+ public LeaderElector getLeaderElector() {
+ return leaderElector;
}
}
@@ -203,7 +271,6 @@ final class ShardLeaderElectionContext e
ActionThrottle lt;
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
- cancelElection();
throw new SolrException(ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
}
MDCLoggingContext.setCore(core);
@@ -225,6 +292,13 @@ final class ShardLeaderElectionContext e
waitForReplicasToComeUp(leaderVoteWait);
}
+ if (isClosed) {
+ // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later,
+ // we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will
+ // re-register the cores and handle a new leadership election.
+ return;
+ }
+
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
@@ -312,34 +386,38 @@ final class ShardLeaderElectionContext e
}
boolean isLeader = true;
- try {
- super.runLeaderProcess(weAreReplacement, 0);
- } catch (Exception e) {
- isLeader = false;
- SolrException.log(log, "There was a problem trying to register as the leader", e);
-
- try (SolrCore core = cc.getCore(coreName)) {
+ if (!isClosed) {
+ try {
+ super.runLeaderProcess(weAreReplacement, 0);
+ } catch (Exception e) {
+ isLeader = false;
+ SolrException.log(log, "There was a problem trying to register as the leader", e);
- if (core == null) {
- log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
- return;
+ try (SolrCore core = cc.getCore(coreName)) {
+
+ if (core == null) {
+ log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+ return;
+ }
+
+ core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
+
+ // we could not publish ourselves as leader - try and rejoin election
+ rejoinLeaderElection(core);
}
-
- core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
-
- // we could not publish ourselves as leader - try and rejoin election
- rejoinLeaderElection(core);
}
- }
-
- if (isLeader) {
- // check for any replicas in my shard that were set to down by the previous leader
- try {
- startLeaderInitiatedRecoveryOnReplicas(coreName);
- } catch (Exception exc) {
- // don't want leader election to fail because of
- // an error trying to tell others to recover
+
+ if (isLeader) {
+ // check for any replicas in my shard that were set to down by the previous leader
+ try {
+ startLeaderInitiatedRecoveryOnReplicas(coreName);
+ } catch (Exception exc) {
+ // don't want leader election to fail because of
+ // an error trying to tell others to recover
+ }
}
+ } else {
+ cancelElection();
}
} finally {
MDCLoggingContext.clear();
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Wed Sep 2 13:06:13 2015
@@ -22,10 +22,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
+import org.apache.solr.cloud.ZkController.ContextKey;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -70,11 +72,21 @@ public class LeaderElector {
private ElectionWatcher watcher;
+ private Map<ContextKey,ElectionContext> electionContexts;
+ private ContextKey contextKey;
+
public LeaderElector(SolrZkClient zkClient) {
this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
}
+ public LeaderElector(SolrZkClient zkClient, ContextKey key, Map<ContextKey,ElectionContext> electionContexts) {
+ this.zkClient = zkClient;
+ zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
+ this.electionContexts = electionContexts;
+ this.contextKey = key;
+ }
+
public ElectionContext getContext() {
return context;
}
@@ -140,20 +152,6 @@ public class LeaderElector {
retryElection(context, false);//join at the tail again
return;
}
- // first we delete the node advertising the old leader in case the ephem is still there
- try {
- zkClient.delete(context.leaderPath, -1, true);
- }catch (KeeperException.NoNodeException nne){
- //no problem
- }catch (InterruptedException e){
- throw e;
- } catch (Exception e) {
- //failed to delete the leader node
- log.error("leader elect delete error",e);
- retryElection(context, false);
- return;
- // fine
- }
try {
runIamLeaderProcess(context, replacement);
@@ -280,7 +278,7 @@ public class LeaderElector {
try {
if(joinAtHead){
log.info("Node {} trying to join election at the head", id);
- List<String> nodes = OverseerProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
+ List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
if(nodes.size() <2){
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);
@@ -423,6 +421,9 @@ public class LeaderElector {
void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
ElectionWatcher watcher = this.watcher;
ElectionContext ctx = context.copy();
+ if (electionContexts != null) {
+ electionContexts.put(contextKey, ctx);
+ }
if (watcher != null) watcher.cancel();
this.context.cancelElection();
this.context = ctx;
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/Overseer.java Wed Sep 2 13:06:13 2015
@@ -136,7 +136,7 @@ public class Overseer implements Closeab
log.info("Starting to work on the main queue");
try {
- ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);
+ ZkStateWriter zkStateWriter = null;
ClusterState clusterState = null;
boolean refreshClusterState = true; // let's refresh in the first iteration
while (!this.isClosed) {
@@ -153,6 +153,7 @@ public class Overseer implements Closeab
try {
reader.updateClusterState();
clusterState = reader.getClusterState();
+ zkStateWriter = new ZkStateWriter(reader, stats);
refreshClusterState = false;
// if there were any errors while processing
@@ -187,7 +188,7 @@ public class Overseer implements Closeab
}
}
- DistributedQueue.QueueEvent head = null;
+ byte[] head = null;
try {
head = stateUpdateQueue.peek(true);
} catch (KeeperException e) {
@@ -206,8 +207,8 @@ public class Overseer implements Closeab
}
try {
while (head != null) {
- final byte[] data = head.getBytes();
- final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+ byte[] data = head;
+ final ZkNodeProps message = ZkNodeProps.load(data);
log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
// we can batch here because workQueue is our fallback in case a ZK write failed
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@@ -237,13 +238,16 @@ public class Overseer implements Closeab
// clean work queue
while (workQueue.poll() != null);
+ } catch (KeeperException.BadVersionException bve) {
+ log.warn("Bad version writing to ZK using compare-and-set, will force refresh cluster state", bve);
+ refreshClusterState = true;
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
- refreshClusterState = true; // it might have been a bad version error
+ refreshClusterState = true; // force refresh state in case of all errors
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
@@ -305,7 +309,7 @@ public class Overseer implements Closeab
try {
Map m = (Map) Utils.fromJSON(data);
String id = (String) m.get("id");
- if(overseerCollectionProcessor.getId().equals(id)){
+ if(overseerCollectionConfigSetProcessor.getId().equals(id)){
try {
log.info("I'm exiting , but I'm still the leader");
zkClient.delete(path,stat.getVersion(),true);
@@ -386,7 +390,7 @@ public class Overseer implements Closeab
case QUIT:
if (myId.equals(message.get("id"))) {
log.info("Quit command received {}", LeaderElector.getNodeName(myId));
- overseerCollectionProcessor.close();
+ overseerCollectionConfigSetProcessor.close();
close();
} else {
log.warn("Overseer received wrong QUIT message {}", message);
@@ -782,7 +786,7 @@ public class Overseer implements Closeab
private final String adminPath;
- private OverseerCollectionProcessor overseerCollectionProcessor;
+ private OverseerCollectionConfigSetProcessor overseerCollectionConfigSetProcessor;
private ZkController zkController;
@@ -820,8 +824,8 @@ public class Overseer implements Closeab
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, adminPath, shardHandler.getShardHandlerFactory());
- overseerCollectionProcessor = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
- ccThread = new OverseerThread(ccTg, overseerCollectionProcessor, "OverseerCollectionProcessor-" + id);
+ overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
+ ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
ccThread.setDaemon(true);
ThreadGroup ohcfTg = new ThreadGroup("Overseer Hdfs SolrCore Failover Thread.");
@@ -918,15 +922,27 @@ public class Overseer implements Closeab
}
/* Collection creation queue */
- static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
+ static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
}
- static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
+ static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient, "/overseer/collection-queue-work", zkStats);
+ return new OverseerTaskQueue(zkClient, "/overseer/collection-queue-work", zkStats);
}
-
+
+ /* The queue for ConfigSet related operations */
+ static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
+ return getConfigSetQueue(zkClient, new Stats());
+ }
+
+ static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
+ // For now, we use the same queue as the collection queue, but ensure
+ // that the actions are prefixed with a unique string.
+ createOverseerNode(zkClient);
+ return getCollectionQueue(zkClient, zkStats);
+ }
+
private static void createOverseerNode(final SolrZkClient zkClient) {
try {
zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Wed Sep 2 13:06:13 2015
@@ -97,7 +97,7 @@ import static org.apache.solr.common.clo
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
@@ -121,7 +121,10 @@ import static org.apache.solr.common.par
import static org.apache.solr.common.util.StrUtils.formatString;
import static org.apache.solr.common.util.Utils.makeMap;
-
+/**
+ * A {@link OverseerMessageHandler} that handles Collections API related
+ * overseer messages.
+ */
public class OverseerCollectionMessageHandler implements OverseerMessageHandler {
public static final String NUM_SLICES = "numShards";
@@ -203,7 +206,7 @@ public class OverseerCollectionMessageHa
@Override
@SuppressWarnings("unchecked")
public SolrResponse processMessage(ZkNodeProps message, String operation) {
- log.warn("OverseerCollectionProcessor.processMessage : "+ operation + " , "+ message.toString());
+ log.warn("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
NamedList results = new NamedList();
try {
@@ -306,7 +309,7 @@ public class OverseerCollectionMessageHa
@SuppressWarnings("unchecked")
private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
- NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
+ CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
@@ -314,7 +317,7 @@ public class OverseerCollectionMessageHa
params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
- params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
+ params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
@@ -371,7 +374,7 @@ public class OverseerCollectionMessageHa
@SuppressWarnings("unchecked")
private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
- String leaderNode = OverseerProcessor.getLeaderNode(zkStateReader.getZkClient());
+ String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
results.add("leader", leaderNode);
Stat stat = new Stat();
zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
@@ -2006,7 +2009,7 @@ public class OverseerCollectionMessageHa
boolean created = false;
while (! waitUntil.hasTimedOut()) {
Thread.sleep(100);
- created = zkStateReader.getClusterState().getCollections().contains(collectionName);
+ created = zkStateReader.getClusterState().hasCollection(collectionName);
if(created) break;
}
if (!created)
@@ -2470,7 +2473,7 @@ public class OverseerCollectionMessageHa
@Override
public String getName() {
- return "Overseer Collection Processor";
+ return "Overseer Collection Message Handler";
}
@Override
@@ -2495,7 +2498,7 @@ public class OverseerCollectionMessageHa
}
@Override
- public void unmarkExclusiveTask(String collectionName, String operation) {
+ public void unmarkExclusiveTask(String collectionName, String operation, ZkNodeProps message) {
if(!CLUSTERSTATUS.isEqual(operation) && collectionName != null) {
synchronized (collectionWip) {
collectionWip.remove(collectionName);
@@ -2510,8 +2513,10 @@ public class OverseerCollectionMessageHa
if(CLUSTERSTATUS.isEqual(message.getStr(Overseer.QUEUE_OPERATION)))
return ExclusiveMarking.EXCLUSIVE;
- if(collectionWip.contains(collectionName))
- return ExclusiveMarking.NONEXCLUSIVE;
+ synchronized (collectionWip) {
+ if(collectionWip.contains(collectionName))
+ return ExclusiveMarking.NONEXCLUSIVE;
+ }
return ExclusiveMarking.NOTDETERMINED;
}
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java Wed Sep 2 13:06:13 2015
@@ -21,7 +21,7 @@ import org.apache.solr.client.solrj.Solr
import org.apache.solr.common.cloud.ZkNodeProps;
/**
- * Interface for processing messages received by an {@link OverseerProcessor}
+ * Interface for processing messages received by an {@link OverseerTaskProcessor}
*/
public interface OverseerMessageHandler {
@@ -61,8 +61,9 @@ public interface OverseerMessageHandler
/**
* @param taskKey the key associated with the task
* @param operation the operation being processed
+ * @param message the message being processed
*/
- void unmarkExclusiveTask(String taskKey, String operation);
+ void unmarkExclusiveTask(String taskKey, String operation, ZkNodeProps message);
/**
* @param taskKey the key associated with the task
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java Wed Sep 2 13:06:13 2015
@@ -61,10 +61,10 @@ public class OverseerNodePrioritizer {
List overseerDesignates = (List) m.get("overseer");
if(overseerDesignates==null || overseerDesignates.isEmpty()) return;
- String ldr = OverseerProcessor.getLeaderNode(zk);
+ String ldr = OverseerTaskProcessor.getLeaderNode(zk);
if(overseerDesignates.contains(ldr)) return;
log.info("prioritizing overseer nodes at {} overseer designates are {}", overseerId, overseerDesignates);
- List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
+ List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
if(electionNodes.size()<2) return;
log.info("sorted nodes {}", electionNodes);
@@ -89,7 +89,7 @@ public class OverseerNodePrioritizer {
//now ask the current leader to QUIT , so that the designate can takeover
Overseer.getInQueue(zkStateReader.getZkClient()).offer(
Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
- "id", OverseerProcessor.getLeaderId(zkStateReader.getZkClient()))));
+ "id", OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()))));
}
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Sep 2 13:06:13 2015
@@ -73,6 +73,7 @@ import org.apache.solr.common.cloud.ZkNo
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
@@ -116,7 +117,8 @@ public final class ZkController {
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final DistributedQueue overseerJobQueue;
- private final DistributedQueue overseerCollectionQueue;
+ private final OverseerTaskQueue overseerCollectionQueue;
+ private final OverseerTaskQueue overseerConfigSetQueue;
private final DistributedMap overseerRunningMap;
private final DistributedMap overseerCompletedMap;
@@ -168,8 +170,6 @@ public final class ZkController {
private final ZkCmdExecutor cmdExecutor;
private final ZkStateReader zkStateReader;
- private final LeaderElector leaderElector;
-
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
private final int localHostPort; // example: 54065
@@ -370,17 +370,18 @@ public final class ZkController {
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
+ closeOutstandingElections(registerOnReconnect);
markAllAsNotLeader(registerOnReconnect);
}
}, zkACLProvider);
this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
+ this.overseerConfigSetQueue = Overseer.getConfigSetQueue(zkClient);
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
cmdExecutor = new ZkCmdExecutor(clientTimeout);
- leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient, new Runnable() {
@Override
public void run() {
@@ -477,6 +478,32 @@ public final class ZkController {
}
}
}
+
+ private void closeOutstandingElections(final CurrentCoreDescriptorProvider registerOnReconnect) {
+
+ List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+ if (descriptors != null) {
+ for (CoreDescriptor descriptor : descriptors) {
+ closeExistingElectionContext(descriptor);
+ }
+ }
+ }
+
+ private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
+ // look for old context - if we find it, cancel it
+ String collection = cd.getCloudDescriptor().getCollectionName();
+ final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
+ ContextKey contextKey = new ContextKey(collection, coreNodeName);
+ ElectionContext prevContext = electionContexts.get(contextKey);
+
+ if (prevContext != null) {
+ prevContext.close();
+ electionContexts.remove(contextKey);
+ }
+
+ return contextKey;
+ }
private void markAllAsNotLeader(
final CurrentCoreDescriptorProvider registerOnReconnect) {
@@ -633,8 +660,9 @@ public final class ZkController {
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
- cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
- cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH,"{}".getBytes(StandardCharsets.UTF_8),CreateMode.PERSISTENT, zkClient);
+ byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
+ cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
+ cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
}
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
@@ -660,7 +688,7 @@ public final class ZkController {
if (!zkRunOnly) {
overseerElector = new LeaderElector(zkClient);
this.overseer = new Overseer(shardHandler, updateShardHandler,
- CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
+ CommonParams.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
ElectionContext context = new OverseerElectionContext(zkClient,
overseer, getNodeName());
overseerElector.setup(context);
@@ -1064,11 +1092,12 @@ public final class ZkController {
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
ZkNodeProps ourProps = new ZkNodeProps(props);
-
+ LeaderElector leaderElector = new LeaderElector(zkClient, contextKey, electionContexts);
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
collection, coreNodeName, ourProps, this, cc);
@@ -1767,10 +1796,14 @@ public final class ZkController {
return overseerJobQueue;
}
- public DistributedQueue getOverseerCollectionQueue() {
+ public OverseerTaskQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
+ public OverseerTaskQueue getOverseerConfigSetQueue() {
+ return overseerConfigSetQueue;
+ }
+
public DistributedMap getOverseerRunningMap() {
return overseerRunningMap;
}
@@ -1868,23 +1901,36 @@ public final class ZkController {
public void rejoinShardLeaderElection(SolrParams params) {
try {
+
String collectionName = params.get(COLLECTION_PROP);
String shardId = params.get(SHARD_ID_PROP);
- String nodeName = params.get(NODE_NAME_PROP);
+ String coreNodeName = params.get(CORE_NODE_NAME_PROP);
String coreName = params.get(CORE_NAME_PROP);
String electionNode = params.get(ELECTION_NODE_PROP);
String baseUrl = params.get(BASE_URL_PROP);
- ZkNodeProps zkProps = new ZkNodeProps(CORE_NAME_PROP, coreName, NODE_NAME_PROP, nodeName, COLLECTION_PROP, collectionName,
- SHARD_ID_PROP, shardId, ELECTION_NODE_PROP, electionNode, BASE_URL_PROP, baseUrl);
-
- ShardLeaderElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, collectionName,
- nodeName, zkProps, this, getCoreContainer());
- LeaderElector elect = new LeaderElector(this.zkClient);
- context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
- elect.setup(context);
-
- elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+ try (SolrCore core = cc.getCore(coreName)) {
+ MDCLoggingContext.setCore(core);
+
+ log.info("Rejoin the shard leader election.");
+
+ ContextKey contextKey = new ContextKey(collectionName, coreNodeName);
+
+ ElectionContext prevContext = electionContexts.get(contextKey);
+ if (prevContext != null) prevContext.cancelElection();
+
+ ZkNodeProps zkProps = new ZkNodeProps(BASE_URL_PROP, baseUrl, CORE_NAME_PROP, coreName, NODE_NAME_PROP, getNodeName(), CORE_NODE_NAME_PROP, coreNodeName);
+
+ LeaderElector elect = ((ShardLeaderElectionContextBase) prevContext).getLeaderElector();
+ ShardLeaderElectionContext context = new ShardLeaderElectionContext(elect, shardId, collectionName,
+ coreNodeName, zkProps, this, getCoreContainer());
+
+ context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
+ elect.setup(context);
+ electionContexts.put(contextKey, context);
+
+ elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+ }
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
}
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java Wed Sep 2 13:06:13 2015
@@ -36,8 +36,28 @@ import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
+/**
+ * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for
+ * both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections
+ * each of which get their own individual state.json in ZK.
+ *
+ * Updates to the cluster state are specified using the
+ * {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} method. The class buffers updates
+ * to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
+ * automatically if necessary. The {@link #writePendingUpdates()} can be used to force flush any pending updates.
+ *
+ * If either {@link #enqueueUpdate(ClusterState, ZkWriteCommand, ZkWriteCallback)} or {@link #writePendingUpdates()}
+ * throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
+ * class is suspect and the current instance of the class should be discarded and a new instance should be created
+ * and used for any future updates.
+ */
public class ZkStateWriter {
+ private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
private static Logger log = LoggerFactory.getLogger(ZkStateWriter.class);
+
+ /**
+ * Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
+ */
public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
protected final ZkStateReader reader;
@@ -52,6 +72,12 @@ public class ZkStateWriter {
protected int lastStateFormat = -1; // sentinel value
protected String lastCollectionName = null;
+ /**
+ * Set to true if we ever get a BadVersionException so that we can disallow future operations
+ * with this instance
+ */
+ protected boolean invalidState = false;
+
public ZkStateWriter(ZkStateReader zkStateReader, Overseer.Stats stats) {
assert zkStateReader != null;
@@ -59,7 +85,32 @@ public class ZkStateWriter {
this.stats = stats;
}
- public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws Exception {
+ /**
+ * Applies the given {@link ZkWriteCommand} on the <code>prevState</code>. The modified
+ * {@link ClusterState} is returned and it is expected that the caller will use the returned
+ * cluster state for the subsequent invocation of this method.
+ * <p>
+ * The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering
+ * logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the
+ * last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates()} can
+ * be used to force an immediate flush of pending cluster state changes.
+ *
+ * @param prevState the cluster state information on which the given <code>cmd</code> is applied
+ * @param cmd the {@link ZkWriteCommand} which specifies the change to be applied to cluster state
+ * @param callback a {@link org.apache.solr.cloud.overseer.ZkStateWriter.ZkWriteCallback} object to be used
+ * for any callbacks
+ * @return modified cluster state created after applying <code>cmd</code> to <code>prevState</code>. If
+ * <code>cmd</code> is a no-op ({@link #NO_OP}) then the <code>prevState</code> is returned unmodified.
+ * @throws IllegalStateException if the current instance is no longer usable. The current instance must be
+ * discarded.
+ * @throws Exception on an error in ZK operations or callback. If a flush to ZooKeeper results
+ * in a {@link org.apache.zookeeper.KeeperException.BadVersionException} this instance becomes unusable and
+ * must be discarded
+ */
+ public ClusterState enqueueUpdate(ClusterState prevState, ZkWriteCommand cmd, ZkWriteCallback callback) throws IllegalStateException, Exception {
+ if (invalidState) {
+ throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+ }
if (cmd == NO_OP) return prevState;
if (maybeFlushBefore(cmd)) {
@@ -74,14 +125,24 @@ public class ZkStateWriter {
callback.onEnqueue();
}
+ /*
+ We need to know if the collection has moved from stateFormat=1 to stateFormat=2 (as a result of MIGRATECLUSTERSTATE)
+ */
+ DocCollection previousCollection = prevState.getCollectionOrNull(cmd.name);
+ boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
+ boolean isCurrentlyStateFormat1 = cmd.collection != null && cmd.collection.getStateFormat() == 1;
+
if (cmd.collection == null) {
- isClusterStateModified = true;
+ if (wasPreviouslyStateFormat1) {
+ isClusterStateModified = true;
+ }
clusterState = prevState.copyWith(cmd.name, null);
updates.put(cmd.name, null);
} else {
- if (cmd.collection.getStateFormat() > 1) {
+ if (!isCurrentlyStateFormat1) {
updates.put(cmd.name, cmd.collection);
- } else {
+ }
+ if (isCurrentlyStateFormat1 || wasPreviouslyStateFormat1) {
isClusterStateModified = true;
}
clusterState = prevState.copyWith(cmd.name, cmd.collection);
@@ -129,14 +190,25 @@ public class ZkStateWriter {
return false;
lastCollectionName = cmd.name;
lastStateFormat = cmd.collection.getStateFormat();
- return System.nanoTime() - lastUpdatedTime > TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+ return System.nanoTime() - lastUpdatedTime > MAX_FLUSH_INTERVAL;
}
public boolean hasPendingUpdates() {
return !updates.isEmpty() || isClusterStateModified;
}
- public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+ /**
+ * Writes all pending updates to ZooKeeper and returns the modified cluster state
+ *
+ * @return the modified cluster state
+ * @throws IllegalStateException if the current instance is no longer usable and must be discarded
+ * @throws KeeperException if any ZooKeeper operation results in an error
+ * @throws InterruptedException if the current thread is interrupted
+ */
+ public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+ if (invalidState) {
+ throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
+ }
if (!hasPendingUpdates()) return clusterState;
TimerContext timerContext = stats.time("update_state");
boolean success = false;
@@ -149,6 +221,7 @@ public class ZkStateWriter {
if (c == null) {
// let's clean up the collections path for this collection
+ log.info("going to delete_collection {}", path);
reader.getZkClient().clean("/collections/" + name);
} else if (c.getStateFormat() > 1) {
byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
@@ -163,7 +236,6 @@ public class ZkStateWriter {
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
clusterState = clusterState.copyWith(name, newCollection);
- isClusterStateModified = true;
}
} else if (c.getStateFormat() == 1) {
isClusterStateModified = true;
@@ -175,7 +247,6 @@ public class ZkStateWriter {
if (isClusterStateModified) {
assert clusterState.getZkClusterStateVersion() >= 0;
- lastUpdatedTime = System.nanoTime();
byte[] data = Utils.toJSON(clusterState);
Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
Set<String> collectionNames = clusterState.getCollections();
@@ -187,7 +258,12 @@ public class ZkStateWriter {
clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collectionStates);
isClusterStateModified = false;
}
+ lastUpdatedTime = System.nanoTime();
success = true;
+ } catch (KeeperException.BadVersionException bve) {
+ // this is a tragic error, we must disallow usage of this instance
+ invalidState = true;
+ throw bve;
} finally {
timerContext.stop();
if (success) {
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/cloud/rule/SnitchContext.java Wed Sep 2 13:06:13 2015
@@ -21,16 +21,14 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import org.apache.http.client.methods.HttpGet;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.response.SimpleSolrResponse;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@@ -89,7 +87,7 @@ public class SnitchContext implements Re
//todo batch all requests to the same server
try {
- SimpleSolrResponse rsp = invoke(snitchInfo.getCoreContainer().getUpdateShardHandler(), url, CoreContainer.CORES_HANDLER_PATH, params);
+ SimpleSolrResponse rsp = invoke(snitchInfo.getCoreContainer().getUpdateShardHandler(), url, CommonParams.CORES_HANDLER_PATH, params);
Map<String, Object> returnedVal = (Map<String, Object>) rsp.getResponse().get(klas);
if(exception == null){
// log this
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/ConfigSetProperties.java Wed Sep 2 13:06:13 2015
@@ -36,6 +36,9 @@ public class ConfigSetProperties {
private static final Logger log = LoggerFactory.getLogger(ConfigSetProperties.class);
+ public static final String DEFAULT_FILENAME = "configsetprops.json";
+ public static final String IMMUTABLE_CONFIGSET_ARG = "immutable";
+
/**
* Return the properties associated with the ConfigSet (e.g. immutable)
*
@@ -55,9 +58,18 @@ public class ConfigSetProperties {
}
try {
+ return readFromInputStream(reader);
+ } finally {
+ IOUtils.closeQuietly(reader);
+ }
+ }
+
+ public static NamedList readFromInputStream(InputStreamReader reader) {
+ try {
JSONParser jsonParser = new JSONParser(reader);
Object object = ObjectBuilder.getVal(jsonParser);
if (!(object instanceof Map)) {
+ final String objectClass = object == null ? "null" : object.getClass().getName();
throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid JSON type " + object.getClass().getName() + ", expected Map");
}
return new NamedList((Map)object);
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreContainer.java Wed Sep 2 13:06:13 2015
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -46,6 +45,7 @@ import org.apache.solr.common.util.IOUti
import org.apache.solr.common.util.Utils;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.InfoHandler;
import org.apache.solr.handler.admin.SecurityConfHandler;
@@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Collections.EMPTY_MAP;
+import static org.apache.solr.common.params.CommonParams.*;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
@@ -97,6 +98,7 @@ public class CoreContainer {
protected CoreAdminHandler coreAdminHandler = null;
protected CollectionsHandler collectionsHandler = null;
private InfoHandler infoHandler;
+ protected ConfigSetsHandler configSetsHandler = null;
private PKIAuthenticationPlugin pkiAuthenticationPlugin;
@@ -126,10 +128,6 @@ public class CoreContainer {
private final JarRepository jarRepository = new JarRepository(this);
- public static final String CORES_HANDLER_PATH = "/admin/cores";
- public static final String COLLECTIONS_HANDLER_PATH = "/admin/collections";
- public static final String INFO_HANDLER_PATH = "/admin/info";
-
private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
private boolean asyncSolrCoreLoad;
@@ -407,8 +405,10 @@ public class CoreContainer {
containerHandlers.put(INFO_HANDLER_PATH, infoHandler);
coreAdminHandler = createHandler(cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
containerHandlers.put(CORES_HANDLER_PATH, coreAdminHandler);
- containerHandlers.put("/admin/authorization", securityConfHandler);
- containerHandlers.put("/admin/authentication", securityConfHandler);
+ configSetsHandler = createHandler(cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
+ containerHandlers.put(CONFIGSETS_HANDLER_PATH, configSetsHandler);
+ containerHandlers.put(AUTHZ_PATH, securityConfHandler);
+ containerHandlers.put(AUTHC_PATH, securityConfHandler);
if(pkiAuthenticationPlugin != null)
containerHandlers.put(PKIAuthenticationPlugin.PATH, pkiAuthenticationPlugin.getRequestHandler());
@@ -481,7 +481,7 @@ public class CoreContainer {
}
}
} finally {
- ExecutorUtil.shutdownNowAndAwaitTermination(coreLoadExecutor);
+ ExecutorUtil.shutdownAndAwaitTermination(coreLoadExecutor);
}
}
};
@@ -1040,6 +1040,10 @@ public class CoreContainer {
return infoHandler;
}
+ public ConfigSetsHandler getConfigSetsHandler() {
+ return configSetsHandler;
+ }
+
public String getHostName() {
return this.hostName;
}
Modified: lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java?rev=1700800&r1=1700799&r2=1700800&view=diff
==============================================================================
--- lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java (original)
+++ lucene/dev/branches/lucene6699/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java Wed Sep 2 13:06:13 2015
@@ -84,7 +84,7 @@ public class CoreDescriptor {
private static ImmutableMap<String, String> defaultProperties = new ImmutableMap.Builder<String, String>()
.put(CORE_CONFIG, "solrconfig.xml")
.put(CORE_SCHEMA, "schema.xml")
- .put(CORE_CONFIGSET_PROPERTIES, "configsetprops.json")
+ .put(CORE_CONFIGSET_PROPERTIES, ConfigSetProperties.DEFAULT_FILENAME)
.put(CORE_DATADIR, "data" + File.separator)
.put(CORE_TRANSIENT, "false")
.put(CORE_LOADONSTARTUP, "true")