You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2014/03/23 08:22:29 UTC
svn commit: r1580463 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/params/
Author: shalin
Date: Sun Mar 23 07:22:28 2014
New Revision: 1580463
URL: http://svn.apache.org/r1580463
Log:
SOLR-5749: A new Overseer status collection API exposes overseer queue sizes, timing statistics, success and error counts and last N failures per operation
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sun Mar 23 07:22:28 2014
@@ -143,6 +143,9 @@ New Features
improve logging and force refresh cluster state every 15 seconds.
(Timothy Potter via shalin)
+* SOLR-5749: A new Overseer status collection API exposes overseer queue sizes, timing
+ statistics, success and error counts and last N failures per operation. (shalin)
+
Bug Fixes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Sun Mar 23 07:22:28 2014
@@ -26,6 +26,7 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -52,10 +53,16 @@ public class DistributedQueue {
private final String prefix = "qn-";
private final String response_prefix = "qnr-" ;
+
+ private final Overseer.Stats stats;
public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl) {
+ this(zookeeper, dir, acl, new Overseer.Stats());
+ }
+
+ public DistributedQueue(SolrZkClient zookeeper, String dir, List<ACL> acl, Overseer.Stats stats) {
this.dir = dir;
-
+
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
try {
cmdExecutor.ensureExists(dir, zookeeper);
@@ -65,12 +72,12 @@ public class DistributedQueue {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
-
+
if (acl != null) {
this.acl = acl;
}
this.zookeeper = zookeeper;
-
+ this.stats = stats;
}
/**
@@ -155,25 +162,30 @@ public class DistributedQueue {
InterruptedException {
TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this.
- while (true) {
- try {
- orderedChildren = orderedChildren(null);
- } catch (KeeperException.NoNodeException e) {
- throw new NoSuchElementException();
- }
- if (orderedChildren.size() == 0) throw new NoSuchElementException();
-
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
+ TimerContext time = stats.time(dir + "_remove");
+ try {
+ while (true) {
try {
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
+ orderedChildren = orderedChildren(null);
} catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
+ throw new NoSuchElementException();
+ }
+ if (orderedChildren.size() == 0) throw new NoSuchElementException();
+
+ for (String headNode : orderedChildren.values()) {
+ String path = dir + "/" + headNode;
+ try {
+ byte[] data = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ return data;
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first.
+ }
}
+
}
-
+ } finally {
+ time.stop();
}
}
@@ -183,15 +195,20 @@ public class DistributedQueue {
*/
public byte[] remove(QueueEvent event) throws KeeperException,
InterruptedException {
- String path = event.getId();
- String responsePath = dir + "/" + response_prefix
- + path.substring(path.lastIndexOf("-") + 1);
- if (zookeeper.exists(responsePath, true)) {
- zookeeper.setData(responsePath, event.getBytes(), true);
- }
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
+ TimerContext time = stats.time(dir + "_remove_event");
+ try {
+ String path = event.getId();
+ String responsePath = dir + "/" + response_prefix
+ + path.substring(path.lastIndexOf("-") + 1);
+ if (zookeeper.exists(responsePath, true)) {
+ zookeeper.setData(responsePath, event.getBytes(), true);
+ }
+ byte[] data = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ return data;
+ } finally {
+ time.stop();
+ }
}
@@ -235,29 +252,34 @@ public class DistributedQueue {
public byte[] take() throws KeeperException, InterruptedException {
TreeMap<Long,String> orderedChildren;
// Same as for element. Should refactor this.
- while (true) {
- LatchChildWatcher childWatcher = new LatchChildWatcher();
- try {
- orderedChildren = orderedChildren(childWatcher);
- } catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
- continue;
- }
- if (orderedChildren.size() == 0) {
- childWatcher.await(DEFAULT_TIMEOUT);
- continue;
- }
-
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
+ TimerContext timer = stats.time(dir + "_take");
+ try {
+ while (true) {
+ LatchChildWatcher childWatcher = new LatchChildWatcher();
try {
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
+ orderedChildren = orderedChildren(childWatcher);
} catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+ continue;
+ }
+ if (orderedChildren.size() == 0) {
+ childWatcher.await(DEFAULT_TIMEOUT);
+ continue;
+ }
+
+ for (String headNode : orderedChildren.values()) {
+ String path = dir + "/" + headNode;
+ try {
+ byte[] data = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ return data;
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first.
+ }
}
}
+ } finally {
+ timer.stop();
}
}
@@ -268,8 +290,13 @@ public class DistributedQueue {
*/
public boolean offer(byte[] data) throws KeeperException,
InterruptedException {
- return createData(dir + "/" + prefix, data,
- CreateMode.PERSISTENT_SEQUENTIAL) != null;
+ TimerContext time = stats.time(dir + "_offer");
+ try {
+ return createData(dir + "/" + prefix, data,
+ CreateMode.PERSISTENT_SEQUENTIAL) != null;
+ } finally {
+ time.stop();
+ }
}
/**
@@ -298,21 +325,26 @@ public class DistributedQueue {
*/
public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
InterruptedException {
- String path = createData(dir + "/" + prefix, data,
- CreateMode.PERSISTENT_SEQUENTIAL);
- String watchID = createData(
- dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
- null, CreateMode.EPHEMERAL);
- Object lock = new Object();
- LatchChildWatcher watcher = new LatchChildWatcher(lock);
- synchronized (lock) {
- if (zookeeper.exists(watchID, watcher, true) != null) {
- watcher.await(timeout);
+ TimerContext time = stats.time(dir + "_offer");
+ try {
+ String path = createData(dir + "/" + prefix, data,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+ String watchID = createData(
+ dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
+ null, CreateMode.EPHEMERAL);
+ Object lock = new Object();
+ LatchChildWatcher watcher = new LatchChildWatcher(lock);
+ synchronized (lock) {
+ if (zookeeper.exists(watchID, watcher, true) != null) {
+ watcher.await(timeout);
+ }
}
+ byte[] bytes = zookeeper.getData(watchID, null, null, true);
+ zookeeper.delete(watchID, -1, true);
+ return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
+ } finally {
+ time.stop();
}
- byte[] bytes = zookeeper.getData(watchID, null, null, true);
- zookeeper.delete(watchID, -1, true);
- return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
}
/**
@@ -322,9 +354,14 @@ public class DistributedQueue {
* @return data at the first element of the queue, or null.
*/
public byte[] peek() throws KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_peek");
+ try {
QueueEvent element = element();
- if(element == null) return null;
+ if (element == null) return null;
return element.getBytes();
+ } finally {
+ time.stop();
+ }
}
public static class QueueEvent {
@@ -399,38 +436,48 @@ public class DistributedQueue {
* @return data at the first element of the queue, or null.
*/
public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
- if (wait == 0) {
- return element();
+ TimerContext time = null;
+ if (wait == Long.MAX_VALUE) {
+ time = stats.time(dir + "_peek_wait_forever");
+ } else {
+ time = stats.time(dir + "_peek_wait" + wait);
}
-
- TreeMap<Long,String> orderedChildren;
- boolean waitedEnough = false;
- while (true) {
- LatchChildWatcher childWatcher = new LatchChildWatcher();
- try {
- orderedChildren = orderedChildren(childWatcher);
- } catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
- continue;
- }
- if(waitedEnough) {
- if(orderedChildren.isEmpty()) return null;
- }
- if (orderedChildren.size() == 0) {
- childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT: wait);
- waitedEnough = wait != Long.MAX_VALUE;
- continue;
+ try {
+ if (wait == 0) {
+ return element();
}
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
+ TreeMap<Long, String> orderedChildren;
+ boolean waitedEnough = false;
+ while (true) {
+ LatchChildWatcher childWatcher = new LatchChildWatcher();
try {
- byte[] data = zookeeper.getData(path, null, null, true);
- return new QueueEvent(path, data, childWatcher.getWatchedEvent());
+ orderedChildren = orderedChildren(childWatcher);
} catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
+ continue;
+ }
+ if (waitedEnough) {
+ if (orderedChildren.isEmpty()) return null;
+ }
+ if (orderedChildren.size() == 0) {
+ childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
+ waitedEnough = wait != Long.MAX_VALUE;
+ continue;
+ }
+
+ for (String headNode : orderedChildren.values()) {
+ String path = dir + "/" + headNode;
+ try {
+ byte[] data = zookeeper.getData(path, null, null, true);
+ return new QueueEvent(path, data, childWatcher.getWatchedEvent());
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first.
+ }
}
}
+ } finally {
+ time.stop();
}
}
@@ -441,11 +488,17 @@ public class DistributedQueue {
* @return Head of the queue or null.
*/
public byte[] poll() throws KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_poll");
try {
return remove();
} catch (NoSuchElementException e) {
return null;
+ } finally {
+ time.stop();
}
}
-
+
+ public Overseer.Stats getStats() {
+ return stats;
+ }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Sun Mar 23 07:22:28 2014
@@ -27,12 +27,16 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ClusterState;
@@ -47,6 +51,9 @@ import org.apache.solr.common.cloud.ZkCo
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.util.stats.Clock;
+import org.apache.solr.util.stats.Timer;
+import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -62,8 +69,11 @@ public class Overseer {
public static final String REMOVESHARD = "removeshard";
public static final String ADD_ROUTING_RULE = "addroutingrule";
public static final String REMOVE_ROUTING_RULE = "removeroutingrule";
+ public static final String STATE = "state";
public static final int STATE_UPDATE_DELAY = 1500; // delay between cloud state updates
+ public static final String CREATESHARD = "createshard";
+ public static final String UPDATESHARDSTATE = "updateshardstate";
private static Logger log = LoggerFactory.getLogger(Overseer.class);
@@ -88,13 +98,16 @@ public class Overseer {
// Internal map which holds the information about failed tasks.
private final DistributedMap failureMap;
+ private final Stats zkStats;
+
private Map clusterProps;
private boolean isClosed = false;
- public ClusterStateUpdater(final ZkStateReader reader, final String myId) {
+ public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
this.zkClient = reader.getZkClient();
- this.stateUpdateQueue = getInQueue(zkClient);
- this.workQueue = getInternalQueue(zkClient);
+ this.zkStats = zkStats;
+ this.stateUpdateQueue = getInQueue(zkClient, zkStats);
+ this.workQueue = getInternalQueue(zkClient, zkStats);
this.failureMap = getFailureMap(zkClient);
this.runningMap = getRunningMap(zkClient);
this.completedMap = getCompletedMap(zkClient);
@@ -102,6 +115,14 @@ public class Overseer {
this.reader = reader;
clusterProps = reader.getClusterProps();
}
+
+ public Stats getStateUpdateQueueStats() {
+ return stateUpdateQueue.getStats();
+ }
+
+ public Stats getWorkQueueStats() {
+ return workQueue.getStats();
+ }
@Override
public void run() {
@@ -133,8 +154,10 @@ public class Overseer {
else if (LeaderStatus.YES == isLeader) {
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.getStr(QUEUE_OPERATION);
+ final TimerContext timerContext = stats.time(operation);
try {
clusterState = processMessage(clusterState, message, operation);
+ stats.success(operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with
@@ -142,6 +165,9 @@ public class Overseer {
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
+ stats.error(operation);
+ } finally {
+ timerContext.stop();
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
@@ -208,8 +234,10 @@ public class Overseer {
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String operation = message.getStr(QUEUE_OPERATION);
+ final TimerContext timerContext = stats.time(operation);
try {
clusterState = processMessage(clusterState, message, operation);
+ stats.success(operation);
} catch (Exception e) {
// generally there is nothing we can do - in most cases, we have
// an issue that will fail again on retry or we cannot communicate with
@@ -217,6 +245,9 @@ public class Overseer {
// TODO: if ordering for the message is not important, we could
// track retries and put it back on the end of the queue
log.error("Overseer could not process the current clusterstate state update message, skipping the message.", e);
+ stats.error(operation);
+ } finally {
+ timerContext.stop();
}
workQueue.offer(head.getBytes());
@@ -253,7 +284,7 @@ public class Overseer {
private ClusterState processMessage(ClusterState clusterState,
final ZkNodeProps message, final String operation) {
- if ("state".equals(operation)) {
+ if (STATE.equals(operation)) {
if( isLegacy( clusterProps )) {
clusterState = updateState(clusterState, message);
} else {
@@ -279,9 +310,9 @@ public class Overseer {
message.getStr(ZkStateReader.SHARD_ID_PROP),
sb.length() > 0 ? sb.toString() : null);
- } else if ("createshard".equals(operation)) {
+ } else if (CREATESHARD.equals(operation)) {
clusterState = createShard(clusterState, message);
- } else if ("updateshardstate".equals(operation)) {
+ } else if (UPDATESHARDSTATE.equals(operation)) {
clusterState = updateShardState(clusterState, message);
} else if (OverseerCollectionProcessor.CREATECOLLECTION.equals(operation)) {
clusterState = buildCollection(clusterState, message);
@@ -1013,7 +1044,14 @@ public class Overseer {
public boolean isClosed() {
return this.isClosed;
}
-
+
+ public DistributedQueue getStateUpdateQueue() {
+ return stateUpdateQueue;
+ }
+
+ public DistributedQueue getWorkQueue() {
+ return workQueue;
+ }
}
static void getShardNames(Integer numShards, List<String> shardNames) {
@@ -1077,11 +1115,15 @@ public class Overseer {
private String adminPath;
private OverseerCollectionProcessor ocp;
+
+ private Stats stats;
+
// overseer not responsible for closing reader
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
this.reader = reader;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
+ this.stats = new Stats();
}
public void start(String id) {
@@ -1090,12 +1132,12 @@ public class Overseer {
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
- updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id));
+ updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id, stats));
updaterThread.setDaemon(true);
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath);
+ ocp = new OverseerCollectionProcessor(reader, id, shardHandler, adminPath, stats);
ccThread = new OverseerThread(ccTg, ocp, "Overseer-" + id);
ccThread.setDaemon(true);
@@ -1136,14 +1178,18 @@ public class Overseer {
* Get queue that can be used to send messages to Overseer.
*/
public static DistributedQueue getInQueue(final SolrZkClient zkClient) {
+ return getInQueue(zkClient, new Stats());
+ }
+
+ static DistributedQueue getInQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient, "/overseer/queue", null);
+ return new DistributedQueue(zkClient, "/overseer/queue", null, zkStats);
}
/* Internal queue, not to be used outside of Overseer */
- static DistributedQueue getInternalQueue(final SolrZkClient zkClient) {
+ static DistributedQueue getInternalQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient, "/overseer/queue-work", null);
+ return new DistributedQueue(zkClient, "/overseer/queue-work", null, zkStats);
}
/* Internal map for failed tasks, not to be used outside of the Overseer */
@@ -1166,8 +1212,12 @@ public class Overseer {
/* Collection creation queue */
static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
+ return getCollectionQueue(zkClient, new Stats());
+ }
+
+ static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null);
+ return new DistributedQueue(zkClient, "/overseer/collection-queue-work", null, zkStats);
}
private static void createOverseerNode(final SolrZkClient zkClient) {
@@ -1192,4 +1242,109 @@ public class Overseer {
return reader;
}
+ /**
+ * Used to hold statistics about overseer operations. It will be exposed
+ * to the OverseerCollectionProcessor to return statistics.
+ *
+ * This is experimental API and subject to change.
+ */
+ public static class Stats {
+ static final int MAX_STORED_FAILURES = 10;
+
+ final Map<String, Stat> stats = Collections.synchronizedMap(new HashMap<String, Stat>());
+
+ public Map<String, Stat> getStats() {
+ return stats;
+ }
+
+ public int getSuccessCount(String operation) {
+ Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
+ return stat == null ? 0 : stat.success.get();
+ }
+
+ public int getErrorCount(String operation) {
+ Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
+ return stat == null ? 0 : stat.errors.get();
+ }
+
+ public void success(String operation) {
+ String op = operation.toLowerCase(Locale.ROOT);
+ Stat stat = stats.get(op);
+ if (stat == null) {
+ stat = new Stat();
+ stats.put(op, stat);
+ }
+ stat.success.incrementAndGet();
+ }
+
+ public void error(String operation) {
+ String op = operation.toLowerCase(Locale.ROOT);
+ Stat stat = stats.get(op);
+ if (stat == null) {
+ stat = new Stat();
+ stats.put(op, stat);
+ }
+ stat.errors.incrementAndGet();
+ }
+
+ public TimerContext time(String operation) {
+ String op = operation.toLowerCase(Locale.ROOT);
+ Stat stat = stats.get(op);
+ if (stat == null) {
+ stat = new Stat();
+ stats.put(op, stat);
+ }
+ return stat.requestTime.time();
+ }
+
+ public void storeFailureDetails(String operation, ZkNodeProps request, SolrResponse resp) {
+ String op = operation.toLowerCase(Locale.ROOT);
+ Stat stat = stats.get(op);
+ if (stat == null) {
+ stat = new Stat();
+ stats.put(op, stat);
+ }
+ LinkedList<FailedOp> failedOps = stat.failureDetails;
+ synchronized (failedOps) {
+ if (failedOps.size() >= MAX_STORED_FAILURES) {
+ failedOps.removeFirst();
+ }
+ failedOps.addLast(new FailedOp(request, resp));
+ }
+ }
+
+ public List<FailedOp> getFailureDetails(String operation) {
+ Stat stat = stats.get(operation.toLowerCase(Locale.ROOT));
+ if (stat == null || stat.failureDetails.isEmpty()) return null;
+ LinkedList<FailedOp> failedOps = stat.failureDetails;
+ synchronized (failedOps) {
+ ArrayList<FailedOp> ret = new ArrayList<>(failedOps);
+ return ret;
+ }
+ }
+ }
+
+ public static class Stat {
+ public final AtomicInteger success;
+ public final AtomicInteger errors;
+ public final Timer requestTime;
+ public final LinkedList<FailedOp> failureDetails;
+
+ public Stat() {
+ this.success = new AtomicInteger();
+ this.errors = new AtomicInteger();
+ this.requestTime = new Timer(TimeUnit.MILLISECONDS, TimeUnit.MINUTES, Clock.defaultClock());
+ this.failureDetails = new LinkedList<>();
+ }
+ }
+
+ public static class FailedOp {
+ public final ZkNodeProps req;
+ public final SolrResponse resp;
+
+ public FailedOp(ZkNodeProps req, SolrResponse resp) {
+ this.req = req;
+ this.resp = resp;
+ }
+ }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Sun Mar 23 07:22:28 2014
@@ -59,6 +59,9 @@ import org.apache.solr.handler.component
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.stats.Snapshot;
+import org.apache.solr.util.stats.Timer;
+import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -85,6 +88,7 @@ import static org.apache.solr.common.clo
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
@@ -162,15 +166,18 @@ public class OverseerCollectionProcessor
private ZkStateReader zkStateReader;
private boolean isClosed;
-
- public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
- this(zkStateReader, myId, shardHandler, adminPath, Overseer.getCollectionQueue(zkStateReader.getZkClient()),
+
+ private Overseer.Stats stats;
+
+ public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath, Overseer.Stats stats) {
+ this(zkStateReader, myId, shardHandler, adminPath, stats, Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
Overseer.getRunningMap(zkStateReader.getZkClient()),
Overseer.getCompletedMap(zkStateReader.getZkClient()), Overseer.getFailureMap(zkStateReader.getZkClient()));
}
protected OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler,
String adminPath,
+ Overseer.Stats stats,
DistributedQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
@@ -183,6 +190,7 @@ public class OverseerCollectionProcessor
this.runningMap = runningMap;
this.completedMap = completedMap;
this.failureMap = failureMap;
+ this.stats = stats;
}
@Override
@@ -227,7 +235,13 @@ public class OverseerCollectionProcessor
log.info("Overseer Collection Processor: Get the message id:" + head.getId() + " message:" + message.toString());
final String operation = message.getStr(QUEUE_OPERATION);
- SolrResponse response = processMessage(message, operation);
+ final TimerContext timerContext = stats.time("collection_" + operation); // even if operation is async, it is sync!
+ SolrResponse response = null;
+ try {
+ response = processMessage(message, operation);
+ } finally {
+ timerContext.stop();
+ }
head.setBytes(SolrResponse.serializable(response));
if (!operation.equals(REQUESTSTATUS) && asyncId != null) {
@@ -242,6 +256,13 @@ public class OverseerCollectionProcessor
workQueue.remove(head);
+ if (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null) {
+ stats.error("collection_" + operation);
+ stats.storeFailureDetails("collection_" + operation, message, response);
+ } else {
+ stats.success("collection_" + operation);
+ }
+
log.info("Overseer Collection Processor: Message id:" + head.getId() + " complete, response:"+ response.getResponse().toString());
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED
@@ -451,7 +472,11 @@ public class OverseerCollectionProcessor
addReplica(zkStateReader.getClusterState(), message, results);
} else if (REQUESTSTATUS.equals(operation)) {
requestStatus(message, results);
- } else {
+ } else if (OVERSEERSTATUS.isEqual(operation)) {
+ getOverseerStatus(message, results);
+ }
+
+ else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
}
@@ -469,6 +494,79 @@ public class OverseerCollectionProcessor
return new OverseerSolrResponse(results);
}
+ private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+ String leaderNode = getLeaderNode(zkStateReader.getZkClient());
+ results.add("leader", leaderNode);
+ Stat stat = new Stat();
+ zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
+ results.add("overseer_queue_size", stat.getNumChildren());
+ stat = new Stat();
+ zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
+ results.add("overseer_work_queue_size", stat.getNumChildren());
+ stat = new Stat();
+ zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
+ results.add("overseer_collection_queue_size", stat.getNumChildren());
+
+ NamedList overseerStats = new NamedList();
+ NamedList collectionStats = new NamedList();
+ NamedList stateUpdateQueueStats = new NamedList();
+ NamedList workQueueStats = new NamedList();
+ NamedList collectionQueueStats = new NamedList();
+ for (Map.Entry<String, Overseer.Stat> entry : this.stats.getStats().entrySet()) {
+ String key = entry.getKey();
+ NamedList<Object> lst = new SimpleOrderedMap<>();
+ if (key.startsWith("collection_")) {
+ collectionStats.add(key.substring(11), lst);
+ int successes = this.stats.getSuccessCount(entry.getKey());
+ int errors = this.stats.getErrorCount(entry.getKey());
+ lst.add("requests", successes);
+ lst.add("errors", errors);
+ List<Overseer.FailedOp> failureDetails = this.stats.getFailureDetails(key);
+ if (failureDetails != null) {
+ List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
+ for (Overseer.FailedOp failedOp : failureDetails) {
+ SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
+ fail.add("request", failedOp.req.getProperties());
+ fail.add("response", failedOp.resp.getResponse());
+ failures.add(fail);
+ }
+ lst.add("recent_failures", failures);
+ }
+ } else if (key.startsWith("/overseer/queue_")) {
+ stateUpdateQueueStats.add(key.substring(16), lst);
+ } else if (key.startsWith("/overseer/queue-work_")) {
+ workQueueStats.add(key.substring(21), lst);
+ } else if (key.startsWith("/overseer/collection-queue-work_")) {
+ collectionQueueStats.add(key.substring(32), lst);
+ } else {
+ // overseer stats
+ overseerStats.add(key, lst);
+ int successes = this.stats.getSuccessCount(entry.getKey());
+ int errors = this.stats.getErrorCount(entry.getKey());
+ lst.add("requests", successes);
+ lst.add("errors", errors);
+ }
+ Timer timer = entry.getValue().requestTime;
+ Snapshot snapshot = timer.getSnapshot();
+ lst.add("totalTime", timer.getSum());
+ lst.add("avgRequestsPerMinute", timer.getMeanRate());
+ lst.add("5minRateReqsPerMinute", timer.getFiveMinuteRate());
+ lst.add("15minRateReqsPerMinute", timer.getFifteenMinuteRate());
+ lst.add("avgTimePerRequest", timer.getMean());
+ lst.add("medianRequestTime", snapshot.getMedian());
+ lst.add("75thPcRequestTime", snapshot.get75thPercentile());
+ lst.add("95thPcRequestTime", snapshot.get95thPercentile());
+ lst.add("99thPcRequestTime", snapshot.get99thPercentile());
+ lst.add("999thPcRequestTime", snapshot.get999thPercentile());
+ }
+ results.add("overseer_operations", overseerStats);
+ results.add("collection_operations", collectionStats);
+ results.add("overseer_queue", stateUpdateQueueStats);
+ results.add("overseer_internal_queue", workQueueStats);
+ results.add("collection_queue", collectionQueueStats);
+
+ }
+
private void processRoleCommand(ZkNodeProps message, String operation) throws KeeperException, InterruptedException {
SolrZkClient zkClient = zkStateReader.getZkClient();
Map roles = null;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Sun Mar 23 07:22:28 2014
@@ -34,6 +34,7 @@ import static org.apache.solr.common.clo
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
import java.io.IOException;
@@ -206,6 +207,10 @@ public class CollectionsHandler extends
this.handleRequestStatus(req, rsp);
break;
}
+ case OVERSEERSTATUS: {
+ this.handleOverseerStatus(req, rsp);
+ break;
+ }
default: {
throw new RuntimeException("Unknown action: " + action);
}
@@ -214,6 +219,12 @@ public class CollectionsHandler extends
rsp.setHttpCaching(false);
}
+ private void handleOverseerStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+ Map<String, Object> props = ZkNodeProps.makeMap(
+ Overseer.QUEUE_OPERATION, OVERSEERSTATUS.toLower());
+ handleResponse(OVERSEERSTATUS.toLower(), new ZkNodeProps(props), rsp);
+ }
+
private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
req.getParams().required().check("name");
String name = req.getParams().get("name");
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Sun Mar 23 07:22:28 2014
@@ -96,7 +96,7 @@ public class OverseerCollectionProcessor
DistributedQueue workQueue, DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
- super(zkStateReader, myId, shardHandler, adminPath, workQueue, runningMap, completedMap, failureMap);
+ super(zkStateReader, myId, shardHandler, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap);
}
@Override
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java?rev=1580463&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java Sun Mar 23 07:22:28 2014
@@ -0,0 +1,123 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+
+public class OverseerStatusTest extends BasicDistributedZkTest {
+
+ public OverseerStatusTest() {
+ schemaString = "schema15.xml"; // we need a string id
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ System.setProperty("numShards", Integer.toString(sliceCount));
+ System.setProperty("solr.xml.persist", "true");
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ if (VERBOSE || printLayoutOnTearDown) {
+ super.printLayout();
+ }
+ if (controlClient != null) {
+ controlClient.shutdown();
+ }
+ if (cloudClient != null) {
+ cloudClient.shutdown();
+ }
+ if (controlClientCloud != null) {
+ controlClientCloud.shutdown();
+ }
+ super.tearDown();
+ }
+
+ @Override
+ public void doTest() throws Exception {
+ waitForThingsToLevelOut(15);
+
+ String collectionName = "overseer_status_test";
+ CollectionAdminResponse response = createCollection(collectionName, 1, 1, 1);
+ NamedList<Object> resp = invokeCollectionApi("action",
+ CollectionParams.CollectionAction.OVERSEERSTATUS.toLower());
+ NamedList<Object> collection_operations = (NamedList<Object>) resp.get("collection_operations");
+ NamedList<Object> overseer_operations = (NamedList<Object>) resp.get("overseer_operations");
+ SimpleOrderedMap<Object> createcollection = (SimpleOrderedMap<Object>) collection_operations.get(OverseerCollectionProcessor.CREATECOLLECTION);
+ assertEquals("No stats for createcollection in OverseerCollectionProcessor", 1, createcollection.get("requests"));
+ createcollection = (SimpleOrderedMap<Object>) overseer_operations.get("createcollection");
+ assertEquals("No stats for createcollection in Overseer", 1, createcollection.get("requests"));
+
+ invokeCollectionApi("action", CollectionParams.CollectionAction.RELOAD.toLower(), "name", collectionName);
+ resp = invokeCollectionApi("action",
+ CollectionParams.CollectionAction.OVERSEERSTATUS.toLower());
+ collection_operations = (NamedList<Object>) resp.get("collection_operations");
+ SimpleOrderedMap<Object> reload = (SimpleOrderedMap<Object>) collection_operations.get(OverseerCollectionProcessor.RELOADCOLLECTION);
+ assertEquals("No stats for reload in OverseerCollectionProcessor", 1, reload.get("requests"));
+
+ try {
+ invokeCollectionApi("action", CollectionParams.CollectionAction.SPLITSHARD.toLower(),
+ "collection", "non_existent_collection",
+ "shard", "non_existent_shard");
+ } catch (Exception e) {
+ // expected because we did not correctly specify required params for split
+ }
+ resp = invokeCollectionApi("action",
+ CollectionParams.CollectionAction.OVERSEERSTATUS.toLower());
+ collection_operations = (NamedList<Object>) resp.get("collection_operations");
+ SimpleOrderedMap<Object> split = (SimpleOrderedMap<Object>) collection_operations.get(OverseerCollectionProcessor.SPLITSHARD);
+ assertEquals("No stats for split in OverseerCollectionProcessor", 1, split.get("errors"));
+ assertNotNull(split.get("recent_failures"));
+
+ waitForThingsToLevelOut(15);
+ }
+
+ private NamedList<Object> invokeCollectionApi(String... args) throws SolrServerException, IOException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ SolrRequest request = new QueryRequest(params);
+ for (int i = 0; i < args.length - 1; i+=2) {
+ params.add(args[i], args[i+1]);
+ }
+ request.setPath("/admin/collections");
+
+ String baseUrl = ((HttpSolrServer) shardToJetty.get(SHARD1).get(0).client.solrClient)
+ .getBaseURL();
+ baseUrl = baseUrl.substring(0, baseUrl.length() - "collection1".length());
+
+ HttpSolrServer baseServer = new HttpSolrServer(baseUrl);
+ baseServer.setConnectionTimeout(15000);
+ baseServer.setSoTimeout(60000 * 5);
+ return baseServer.request(request);
+ }
+}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Sun Mar 23 07:22:28 2014
@@ -43,6 +43,7 @@ import org.apache.solr.common.cloud.Slic
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.zookeeper.CreateMode;
@@ -912,7 +913,7 @@ public class OverseerTest extends SolrTe
reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
//prepopulate work queue with some items to emulate previous overseer died before persisting state
- DistributedQueue queue = Overseer.getInternalQueue(zkClient);
+ DistributedQueue queue = Overseer.getInternalQueue(zkClient, new Overseer.Stats());
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
ZkStateReader.NODE_NAME_PROP, "node1",
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1580463&r1=1580462&r2=1580463&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Sun Mar 23 07:22:28 2014
@@ -43,7 +43,8 @@ public interface CollectionParams
REMOVEROLE,
CLUSTERPROP,
REQUESTSTATUS,
- ADDREPLICA;
+ ADDREPLICA,
+ OVERSEERSTATUS;
public static CollectionAction get( String p )
{