You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gc...@apache.org on 2015/08/06 08:07:33 UTC
svn commit: r1694406 [4/4] - in /lucene/dev/trunk/solr:
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/cloud/overseer/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/ solrj/src/test/org/apache/solr/...
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java?rev=1694406&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java Thu Aug 6 06:07:32 2015
@@ -0,0 +1,80 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.common.cloud.ZkNodeProps;
+
+/**
+ * Interface for processing messages received by an {@link OverseerProcessor}
+ */
+public interface OverseerMessageHandler {
+
+ /**
+ * @param message the message to process
+ * @param operation the operation to process
+ *
+ * @return response
+ */
+ SolrResponse processMessage(ZkNodeProps message, String operation);
+
+ /**
+ * @return the name of the OverseerMessageHandler
+ */
+ String getName();
+
+ /**
+ * @param operation the operation to be timed
+ *
+ * @return the name of the timer to use for the operation
+ */
+ String getTimerName(String operation);
+
+ /**
+ * @param message the message being processed
+ *
+ * @return the taskKey for the message for handling task exclusivity
+ */
+ String getTaskKey(ZkNodeProps message);
+
+ /**
+ * @param taskKey the key associated with the task, cached from getTaskKey
+ * @param message the message being processed
+ */
+ void markExclusiveTask(String taskKey, ZkNodeProps message);
+
+ /**
+ * @param taskKey the key associated with the task
+ * @param operation the operation being processed
+ */
+ void unmarkExclusiveTask(String taskKey, String operation);
+
+ /**
+ * @param taskKey the key associated with the task
+ * @param message the message being processed
+ *
+ * @return the exclusive marking
+ */
+ ExclusiveMarking checkExclusiveMarking(String taskKey, ZkNodeProps message);
+
+ enum ExclusiveMarking {
+ NOTDETERMINED, // not enough context, fall back to the processor (i.e. look at running tasks)
+ EXCLUSIVE,
+ NONEXCLUSIVE
+ }
+}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java?rev=1694406&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java Thu Aug 6 06:07:32 2015
@@ -0,0 +1,112 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for prioritization of Overseer nodes, for example with the
+ * ADDROLE collection command.
+ */
+public class OverseerNodePrioritizer {
+
+ private static Logger log = LoggerFactory.getLogger(OverseerNodePrioritizer.class);
+
+ private final ZkStateReader zkStateReader;
+ private final String adminPath;
+ private final ShardHandlerFactory shardHandlerFactory;
+
+ public OverseerNodePrioritizer(ZkStateReader zkStateReader, String adminPath, ShardHandlerFactory shardHandlerFactory) {
+ this.zkStateReader = zkStateReader;
+ this.adminPath = adminPath;
+ this.shardHandlerFactory = shardHandlerFactory;
+ }
+
+ public synchronized void prioritizeOverseerNodes(String overseerId) throws KeeperException, InterruptedException {
+ SolrZkClient zk = zkStateReader.getZkClient();
+ if(!zk.exists(ZkStateReader.ROLES,true))return;
+ Map m = (Map) Utils.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat(), true));
+
+ List overseerDesignates = (List) m.get("overseer");
+ if(overseerDesignates==null || overseerDesignates.isEmpty()) return;
+ String ldr = OverseerProcessor.getLeaderNode(zk);
+ if(overseerDesignates.contains(ldr)) return;
+ log.info("prioritizing overseer nodes at {} overseer designates are {}", overseerId, overseerDesignates);
+ List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
+ if(electionNodes.size()<2) return;
+ log.info("sorted nodes {}", electionNodes);
+
+ String designateNodeId = null;
+ for (String electionNode : electionNodes) {
+ if(overseerDesignates.contains( LeaderElector.getNodeName(electionNode))){
+ designateNodeId = electionNode;
+ break;
+ }
+ }
+
+ if(designateNodeId == null){
+ log.warn("No live overseer designate ");
+ return;
+ }
+ if(!designateNodeId.equals( electionNodes.get(1))) { //checking if it is already at no:1
+ log.info("asking node {} to come join election at head", designateNodeId);
+ invokeOverseerOp(designateNodeId, "rejoinAtHead"); //ask designate to come first
+ log.info("asking the old first in line {} to rejoin election ",electionNodes.get(1) );
+ invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
+ }
+ //now ask the current leader to QUIT , so that the designate can takeover
+ Overseer.getInQueue(zkStateReader.getZkClient()).offer(
+ Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
+ "id", OverseerProcessor.getLeaderId(zkStateReader.getZkClient()))));
+
+ }
+
+ private void invokeOverseerOp(String electionNode, String op) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
+ params.set("op", op);
+ params.set("qt", adminPath);
+ params.set("electionNode", electionNode);
+ ShardRequest sreq = new ShardRequest();
+ sreq.purpose = 1;
+ String replica = zkStateReader.getBaseUrlForNodeName(LeaderElector.getNodeName(electionNode));
+ sreq.shards = new String[]{replica};
+ sreq.actualShards = sreq.shards;
+ sreq.params = params;
+ shardHandler.submit(sreq, replica, sreq.params);
+ shardHandler.takeCompletedOrError();
+ }
+}
Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java?rev=1694406&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java Thu Aug 6 06:07:32 2015
@@ -0,0 +1,571 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.Overseer.LeaderStatus;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.stats.TimerContext;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+/**
+ * A generic processor run in the Overseer, used for handling items added
+ * to a distributed work queue. Has support for handling exclusive tasks
+ * (i.e. tasks that should not run in parallel with each other).
+ *
+ * An {@link OverseerMessageHandlerSelector} determines which
+ * {@link OverseerMessageHandler} handles specific messages in the
+ * queue.
+ */
+public class OverseerProcessor implements Runnable, Closeable {
+
+ public int maxParallelThreads = 10;
+
+ public ExecutorService tpe ;
+
+ private static Logger log = LoggerFactory
+ .getLogger(OverseerProcessor.class);
+
+ private DistributedQueue workQueue;
+ private DistributedMap runningMap;
+ private DistributedMap completedMap;
+ private DistributedMap failureMap;
+
+ // Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
+ final private Set runningTasks;
+
+ // List of completed tasks. This is used to clean up workQueue in zk.
+ final private HashMap<String, QueueEvent> completedTasks;
+
+ private String myId;
+
+ private final ShardHandlerFactory shardHandlerFactory;
+
+ private String adminPath;
+
+ private ZkStateReader zkStateReader;
+
+ private boolean isClosed;
+
+ private Overseer.Stats stats;
+
+ // Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
+ // It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
+ // deleted from the work-queue as that is a batched operation.
+ final private Set<String> runningZKTasks;
+ private final Object waitLock = new Object();
+
+ private OverseerMessageHandlerSelector selector;
+
+ private OverseerNodePrioritizer prioritizer;
+
+ public OverseerProcessor(ZkStateReader zkStateReader, String myId,
+ final ShardHandlerFactory shardHandlerFactory,
+ String adminPath,
+ Overseer.Stats stats,
+ OverseerMessageHandlerSelector selector,
+ OverseerNodePrioritizer prioritizer,
+ DistributedQueue workQueue,
+ DistributedMap runningMap,
+ DistributedMap completedMap,
+ DistributedMap failureMap) {
+ this.zkStateReader = zkStateReader;
+ this.myId = myId;
+ this.shardHandlerFactory = shardHandlerFactory;
+ this.adminPath = adminPath;
+ this.stats = stats;
+ this.selector = selector;
+ this.prioritizer = prioritizer;
+ this.workQueue = workQueue;
+ this.runningMap = runningMap;
+ this.completedMap = completedMap;
+ this.failureMap = failureMap;
+ this.runningZKTasks = new HashSet<>();
+ this.runningTasks = new HashSet();
+ this.completedTasks = new HashMap<>();
+ }
+
+ @Override
+ public void run() {
+ log.info("Process current queue of overseer operations");
+ LeaderStatus isLeader = amILeader();
+ while (isLeader == LeaderStatus.DONT_KNOW) {
+ log.debug("am_i_leader unclear {}", isLeader);
+ isLeader = amILeader(); // not a no, not a yes, try ask again
+ }
+
+ String oldestItemInWorkQueue = null;
+ // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer.
+ // This variable is set in case there's any task found on the workQueue when the OCP starts up and
+ // the id for the queue tail is used as a marker to check for the task in completed/failed map in zk.
+ // Beyond the marker, all tasks can safely be assumed to have never been executed.
+ boolean hasLeftOverItems = true;
+
+ try {
+ oldestItemInWorkQueue = workQueue.getTailId();
+ } catch (KeeperException e) {
+ // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
+ // async calls.
+ SolrException.log(log, "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ if (oldestItemInWorkQueue == null)
+ hasLeftOverItems = false;
+ else
+ log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
+
+ try {
+ prioritizer.prioritizeOverseerNodes(myId);
+ } catch (Exception e) {
+ log.error("Unable to prioritize overseer ", e);
+ }
+
+ // TODO: Make maxThreads configurable.
+
+ this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 100, 0L, TimeUnit.MILLISECONDS,
+ new SynchronousQueue<Runnable>(),
+ new DefaultSolrThreadFactory("OverseerThreadFactory"));
+ try {
+ while (!this.isClosed) {
+ try {
+ isLeader = amILeader();
+ if (LeaderStatus.NO == isLeader) {
+ break;
+ } else if (LeaderStatus.YES != isLeader) {
+ log.debug("am_i_leader unclear {}", isLeader);
+ continue; // not a no, not a yes, try asking again
+ }
+
+ log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
+ cleanUpWorkQueue();
+
+ printTrackingMaps();
+
+ boolean waited = false;
+
+ while (runningTasks.size() > maxParallelThreads) {
+ synchronized (waitLock) {
+ waitLock.wait(100);//wait for 100 ms or till a task is complete
+ }
+ waited = true;
+ }
+
+ if (waited)
+ cleanUpWorkQueue();
+
+ List<QueueEvent> heads = workQueue.peekTopN(maxParallelThreads, runningZKTasks, 2000L);
+
+ if (heads == null)
+ continue;
+
+ log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
+
+ if (isClosed) break;
+
+ for (QueueEvent head : heads) {
+ final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+ OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
+ String taskKey = messageHandler.getTaskKey(message);
+ final String asyncId = message.getStr(ASYNC);
+ if (hasLeftOverItems) {
+ if (head.getId().equals(oldestItemInWorkQueue))
+ hasLeftOverItems = false;
+ if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
+ log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId );
+ workQueue.remove(head);
+ continue;
+ }
+ }
+
+ if (!checkExclusivity(messageHandler, message, head.getId())) {
+ log.debug("Exclusivity check failed for [{}]", message.toString());
+ continue;
+ }
+
+ try {
+ markTaskAsRunning(messageHandler, head, taskKey, asyncId, message);
+ log.debug("Marked task [{}] as running", head.getId());
+ } catch (KeeperException.NodeExistsException e) {
+ // This should never happen
+ log.error("Tried to pick up task [{}] when it was already running!", head.getId());
+ } catch (InterruptedException e) {
+ log.error("Thread interrupted while trying to pick task for execution.", head.getId());
+ Thread.currentThread().interrupt();
+ }
+
+ log.info(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString());
+ String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ Runner runner = new Runner(messageHandler, message,
+ operation, head);
+ tpe.execute(runner);
+ }
+
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+ log.warn("Overseer cannot talk to ZK");
+ return;
+ }
+ SolrException.log(log, "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ SolrException.log(log, "", e);
+ }
+ }
+ } finally {
+ this.close();
+ }
+ }
+
+ protected boolean checkExclusivity(OverseerMessageHandler messageHandler, ZkNodeProps message, String id)
+ throws KeeperException, InterruptedException {
+ String taskKey = messageHandler.getTaskKey(message);
+
+ if(taskKey == null)
+ return true;
+
+ OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message);
+ switch (marking) {
+ case NOTDETERMINED:
+ break;
+ case EXCLUSIVE:
+ return true;
+ case NONEXCLUSIVE:
+ return false;
+ default:
+ throw new IllegalArgumentException("Undefined marking: " + marking);
+ }
+
+ if(runningZKTasks.contains(id))
+ return false;
+
+ return true;
+ }
+
+ private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
+ synchronized (completedTasks) {
+ for (String id : completedTasks.keySet()) {
+ workQueue.remove(completedTasks.get(id));
+ runningZKTasks.remove(id);
+ }
+ completedTasks.clear();
+ }
+ }
+
+ public void close() {
+ isClosed = true;
+ if(tpe != null) {
+ if (!tpe.isShutdown()) {
+ tpe.shutdown();
+ try {
+ tpe.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Thread interrupted while waiting for OCP threadpool close.");
+ Thread.currentThread().interrupt();
+ } finally {
+ if (!tpe.isShutdown())
+ tpe.shutdownNow();
+ }
+ }
+ }
+ }
+
+ public static List<String> getSortedOverseerNodeNames(SolrZkClient zk) throws KeeperException, InterruptedException {
+ List<String> children = null;
+ try {
+ children = zk.getChildren(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE, null, true);
+ } catch (Exception e) {
+ log.warn("error ", e);
+ return new ArrayList<>();
+ }
+ LeaderElector.sortSeqs(children);
+ ArrayList<String> nodeNames = new ArrayList<>(children.size());
+ for (String c : children) nodeNames.add(LeaderElector.getNodeName(c));
+ return nodeNames;
+ }
+
+ public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
+ List<String> children = null;
+ try {
+ children = zk.getChildren(path, null, true);
+ LeaderElector.sortSeqs(children);
+ return children;
+ } catch (Exception e) {
+ throw e;
+ }
+
+ }
+
+ public static String getLeaderNode(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ String id = getLeaderId(zkClient);
+ return id==null ?
+ null:
+ LeaderElector.getNodeName( id);
+ }
+
+ public static String getLeaderId(SolrZkClient zkClient) throws KeeperException,InterruptedException{
+ byte[] data = null;
+ try {
+ data = zkClient.getData("/overseer_elect/leader", null, new Stat(), true);
+ } catch (KeeperException.NoNodeException e) {
+ return null;
+ }
+ Map m = (Map) Utils.fromJSON(data);
+ return (String) m.get("id");
+ }
+
+ protected LeaderStatus amILeader() {
+ String statsName = "collection_am_i_leader";
+ TimerContext timerContext = stats.time(statsName);
+ boolean success = true;
+ try {
+ ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
+ "/overseer_elect/leader", null, null, true));
+ if (myId.equals(props.getStr("id"))) {
+ return LeaderStatus.YES;
+ }
+ } catch (KeeperException e) {
+ success = false;
+ if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.error("", e);
+ return LeaderStatus.DONT_KNOW;
+ } else if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
+ log.info("", e);
+ } else {
+ log.warn("", e);
+ }
+ } catch (InterruptedException e) {
+ success = false;
+ Thread.currentThread().interrupt();
+ } finally {
+ timerContext.stop();
+ if (success) {
+ stats.success(statsName);
+ } else {
+ stats.error(statsName);
+ }
+ }
+ log.info("According to ZK I (id=" + myId + ") am no longer a leader.");
+ return LeaderStatus.NO;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void markTaskAsRunning(OverseerMessageHandler messageHandler, QueueEvent head, String taskKey,
+ String asyncId, ZkNodeProps message)
+ throws KeeperException, InterruptedException {
+ synchronized (runningZKTasks) {
+ runningZKTasks.add(head.getId());
+ }
+
+ synchronized (runningTasks) {
+ runningTasks.add(head.getId());
+ }
+
+ messageHandler.markExclusiveTask(taskKey, message);
+
+ if(asyncId != null)
+ runningMap.put(asyncId, null);
+ }
+
+ protected class Runner implements Runnable {
+ ZkNodeProps message;
+ String operation;
+ SolrResponse response;
+ QueueEvent head;
+ OverseerMessageHandler messageHandler;
+
+ public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head) {
+ this.message = message;
+ this.operation = operation;
+ this.head = head;
+ this.messageHandler = messageHandler;
+ response = null;
+ }
+
+
+ public void run() {
+ String statsName = messageHandler.getTimerName(operation);
+ final TimerContext timerContext = stats.time(statsName);
+
+ boolean success = false;
+ final String asyncId = message.getStr(ASYNC);
+ String taskKey = messageHandler.getTaskKey(message);
+
+ try {
+ try {
+ log.debug("Runner processing {}", head.getId());
+ response = messageHandler.processMessage(message, operation);
+ } finally {
+ timerContext.stop();
+ updateStats(statsName);
+ }
+
+ if(asyncId != null) {
+ if (response != null && (response.getResponse().get("failure") != null
+ || response.getResponse().get("exception") != null)) {
+ failureMap.put(asyncId, SolrResponse.serializable(response));
+ log.debug("Updated failed map for task with zkid:[{}]", head.getId());
+ } else {
+ completedMap.put(asyncId, SolrResponse.serializable(response));
+ log.debug("Updated completed map for task with zkid:[{}]", head.getId());
+ }
+ } else {
+ head.setBytes(SolrResponse.serializable(response));
+ log.debug("Completed task:[{}]", head.getId());
+ }
+
+ markTaskComplete(messageHandler, head.getId(), asyncId, taskKey);
+ log.debug("Marked task [{}] as completed.", head.getId());
+ printTrackingMaps();
+
+ log.info(messageHandler.getName() + ": Message id:" + head.getId() +
+ " complete, response:" + response.getResponse().toString());
+ success = true;
+ } catch (KeeperException e) {
+ SolrException.log(log, "", e);
+ } catch (InterruptedException e) {
+ // Reset task from tracking data structures so that it can be retried.
+ resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
+ log.warn("Resetting task {} as the thread was interrupted.", head.getId());
+ Thread.currentThread().interrupt();
+ } finally {
+ if(!success) {
+ // Reset task from tracking data structures so that it can be retried.
+ resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey);
+ }
+ synchronized (waitLock){
+ waitLock.notifyAll();
+ }
+ }
+ }
+
+ private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey)
+ throws KeeperException, InterruptedException {
+ synchronized (completedTasks) {
+ completedTasks.put(id, head);
+ }
+
+ synchronized (runningTasks) {
+ runningTasks.remove(id);
+ }
+
+ if(asyncId != null)
+ runningMap.remove(asyncId);
+
+ messageHandler.unmarkExclusiveTask(taskKey, operation);
+ }
+
+ private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey) {
+ log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey);
+ try {
+ if (asyncId != null)
+ runningMap.remove(asyncId);
+
+ synchronized (runningTasks) {
+ runningTasks.remove(id);
+ }
+
+ messageHandler.unmarkExclusiveTask(taskKey, operation);
+ } catch (KeeperException e) {
+ SolrException.log(log, "", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ private void updateStats(String statsName) {
+ if (isSuccessful()) {
+ stats.success(statsName);
+ } else {
+ stats.error(statsName);
+ stats.storeFailureDetails(statsName, message, response);
+ }
+ }
+
+ private boolean isSuccessful() {
+ if(response == null)
+ return false;
+ return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null);
+ }
+ }
+
+ private void printTrackingMaps() {
+ if(log.isDebugEnabled()) {
+ synchronized (runningTasks) {
+ log.debug("RunningTasks: {}", runningTasks.toString());
+ }
+ synchronized (completedTasks) {
+ log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
+ }
+ synchronized (runningZKTasks) {
+ log.debug("RunningZKTasks: {}", runningZKTasks.toString());
+ }
+ }
+ }
+
+
+
+ String getId(){
+ return myId;
+ }
+
+ /**
+ * An interface to determine which {@link OverseerMessageHandler}
+ * handles a given message. This could be a single OverseerMessageHandler
+ * for the case where a single type of message is handled (e.g. collection
+ * messages only) , or a different handler could be selected based on the
+ * contents of the message.
+ */
+ public interface OverseerMessageHandlerSelector {
+ OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
+ }
+
+}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java Thu Aug 6 06:07:32 2015
@@ -24,7 +24,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerCollectionMessageHandler;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -87,10 +87,10 @@ public class ClusterStateMutator {
Map<String, Object> collectionProps = new HashMap<>();
- for (Map.Entry<String, Object> e : OverseerCollectionProcessor.COLL_PROPS.entrySet()) {
+ for (Map.Entry<String, Object> e : OverseerCollectionMessageHandler.COLL_PROPS.entrySet()) {
Object val = message.get(e.getKey());
if (val == null) {
- val = OverseerCollectionProcessor.COLL_PROPS.get(e.getKey());
+ val = OverseerCollectionMessageHandler.COLL_PROPS.get(e.getKey());
}
if (val != null) collectionProps.put(e.getKey(), val);
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java Thu Aug 6 06:07:32 2015
@@ -28,7 +28,7 @@ import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.cloud.Assign;
import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerCollectionMessageHandler;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -40,7 +40,7 @@ import org.apache.solr.common.util.Utils
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
import static org.apache.solr.common.params.CommonParams.NAME;
@@ -107,18 +107,18 @@ public class ReplicaMutator {
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
- property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
+ property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
}
property = property.toLowerCase(Locale.ROOT);
String propVal = message.getStr(ZkStateReader.PROPERTY_VALUE_PROP);
- String shardUnique = message.getStr(OverseerCollectionProcessor.SHARD_UNIQUE);
+ String shardUnique = message.getStr(OverseerCollectionMessageHandler.SHARD_UNIQUE);
boolean isUnique = false;
if (SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property)) {
if (StringUtils.isNotBlank(shardUnique) && Boolean.parseBoolean(shardUnique) == false) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Overseer ADDREPLICAPROP for " +
- property + " cannot have " + OverseerCollectionProcessor.SHARD_UNIQUE + " set to anything other than" +
+ property + " cannot have " + OverseerCollectionMessageHandler.SHARD_UNIQUE + " set to anything other than" +
"'true'. No action taken");
}
isUnique = true;
@@ -170,7 +170,7 @@ public class ReplicaMutator {
String replicaName = message.getStr(ZkStateReader.REPLICA_PROP);
String property = message.getStr(ZkStateReader.PROPERTY_PROP).toLowerCase(Locale.ROOT);
if (StringUtils.startsWith(property, COLL_PROP_PREFIX) == false) {
- property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
+ property = OverseerCollectionMessageHandler.COLL_PROP_PREFIX + property;
}
Replica replica = clusterState.getReplica(collectionName, replicaName);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java Thu Aug 6 06:07:32 2015
@@ -37,7 +37,7 @@ import org.apache.solr.common.util.Utils
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.params.CommonParams.NAME;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Aug 6 06:07:32 2015
@@ -71,16 +71,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.REQUESTID;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
import static org.apache.solr.common.cloud.DocCollection.RULE;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java Thu Aug 6 06:07:32 2015
@@ -25,7 +25,7 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.cloud.LeaderElector;
-import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.OverseerProcessor;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -160,7 +160,7 @@ class RebalanceLeaders {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
- List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
@@ -193,7 +193,7 @@ class RebalanceLeaders {
throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
- List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
// First, queue up the preferred leader at the head of the queue.
@@ -210,12 +210,12 @@ class RebalanceLeaders {
return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
}
- List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ List<String> electionNodesTmp = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
// Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
- electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String thisNode : electionNodes) {
@@ -238,7 +238,7 @@ class RebalanceLeaders {
int oldSeq = LeaderElector.getSeq(electionNode);
for (int idx = 0; idx < 600; ++idx) {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
- List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+ List<String> electionNodes = OverseerProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
for (String testNode : electionNodes) {
if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/AsyncMigrateRouteKeyTest.java Thu Aug 6 06:07:32 2015
@@ -50,7 +50,7 @@ public class AsyncMigrateRouteKeyTest ex
String message;
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
- params.set(OverseerCollectionProcessor.REQUESTID, asyncId);
+ params.set(OverseerCollectionMessageHandler.REQUESTID, asyncId);
// This task takes long enough to run. Also check for the current state of the task to be running.
message = sendStatusRequestWithRetry(params, 5);
assertEquals("found " + asyncId + " in running tasks", message);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BaseCdcrDistributedZkTest.java Thu Aug 6 06:07:32 2015
@@ -45,7 +45,7 @@ import java.io.File;
import java.io.IOException;
import java.util.*;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.*;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@@ -398,7 +398,7 @@ public class BaseCdcrDistributedZkTest e
}
Integer replicationFactor = (Integer) collectionProps.get(REPLICATION_FACTOR);
if (replicationFactor == null) {
- replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(REPLICATION_FACTOR);
+ replicationFactor = (Integer) OverseerCollectionMessageHandler.COLL_PROPS.get(REPLICATION_FACTOR);
}
if (confSetName != null) {
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Thu Aug 6 06:07:32 2015
@@ -571,10 +571,10 @@ public class BasicDistributedZkTest exte
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionAction.CREATE.toString());
- params.set(OverseerCollectionProcessor.NUM_SLICES, numShards);
+ params.set(OverseerCollectionMessageHandler.NUM_SLICES, numShards);
params.set(ZkStateReader.REPLICATION_FACTOR, numReplicas);
params.set(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
- if (createNodeSetStr != null) params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSetStr);
+ if (createNodeSetStr != null) params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, createNodeSetStr);
int clientIndex = clients.size() > 1 ? random().nextInt(2) : 0;
List<Integer> list = new ArrayList<>();
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java Thu Aug 6 06:07:32 2015
@@ -82,7 +82,7 @@ import org.apache.solr.core.SolrInfoMBea
import org.apache.solr.servlet.SolrDispatchFilter;
import org.junit.Test;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
@@ -442,7 +442,7 @@ public class CollectionsAPIDistributedZk
String nn1 = ((SolrDispatchFilter) jettys.get(0).getDispatchFilter().getFilter()).getCores().getZkController().getNodeName();
String nn2 = ((SolrDispatchFilter) jettys.get(1).getDispatchFilter().getFilter()).getCores().getZkController().getNodeName();
- params.set(OverseerCollectionProcessor.CREATE_NODE_SET, nn1 + "," + nn2);
+ params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, nn1 + "," + nn2);
request = new QueryRequest(params);
request.setPath("/admin/collections");
gotExp = false;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java Thu Aug 6 06:07:32 2015
@@ -47,8 +47,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteLastCustomShardedReplicaTest.java Thu Aug 6 06:07:32 2015
@@ -36,8 +36,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java Thu Aug 6 06:07:32 2015
@@ -40,8 +40,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java Thu Aug 6 06:07:32 2015
@@ -39,7 +39,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Thu Aug 6 06:07:32 2015
@@ -106,7 +106,7 @@ public class OverseerCollectionProcessor
DistributedQueue workQueue, DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
- super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), workQueue, runningMap, completedMap, failureMap);
+ super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
}
@Override
@@ -407,14 +407,14 @@ public class OverseerCollectionProcessor
ZkStateReader.REPLICATION_FACTOR, replicationFactor.toString(),
"name", COLLECTION_NAME,
"collection.configName", CONFIG_NAME,
- OverseerCollectionProcessor.NUM_SLICES, numberOfSlices.toString(),
+ OverseerCollectionMessageHandler.NUM_SLICES, numberOfSlices.toString(),
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode.toString()
);
if (sendCreateNodeList) {
- propMap.put(OverseerCollectionProcessor.CREATE_NODE_SET,
+ propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET,
(createNodeList != null)?StrUtils.join(createNodeList, ','):null);
- if (OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE_DEFAULT != createNodeSetShuffle || random().nextBoolean()) {
- propMap.put(OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE, createNodeSetShuffle);
+ if (OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT != createNodeSetShuffle || random().nextBoolean()) {
+ propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE, createNodeSetShuffle);
}
}
@@ -590,7 +590,7 @@ public class OverseerCollectionProcessor
}
}
- if (random().nextBoolean()) Collections.shuffle(createNodeList, OverseerCollectionProcessor.RANDOM);
+ if (random().nextBoolean()) Collections.shuffle(createNodeList, OverseerCollectionMessageHandler.RANDOM);
List<SubmitCapture> submitCaptures = null;
if (collectionExceptedToBeCreated) {
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java Thu Aug 6 06:07:32 2015
@@ -41,9 +41,9 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
import static org.apache.solr.cloud.OverseerCollectionProcessor.getLeaderNode;
import static org.apache.solr.cloud.OverseerCollectionProcessor.getSortedOverseerNodeNames;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java Thu Aug 6 06:07:32 2015
@@ -50,7 +50,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java Thu Aug 6 06:07:32 2015
@@ -43,7 +43,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
public class TestCollectionAPI extends ReplicaPropertiesBase {
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestMiniSolrCloudCluster.java Thu Aug 6 06:07:32 2015
@@ -295,7 +295,7 @@ public class TestMiniSolrCloudCluster ex
// create collection
final String asyncId = (random().nextBoolean() ? null : "asyncId("+collectionName+".create)="+random().nextInt());
- createCollection(miniCluster, collectionName, OverseerCollectionProcessor.CREATE_NODE_SET_EMPTY, asyncId);
+ createCollection(miniCluster, collectionName, OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY, asyncId);
if (asyncId != null) {
assertEquals("did not see async createCollection completion", "completed", AbstractFullDistribZkTestBase.getRequestStateAfterCompletion(asyncId, 330, cloudSolrClient));
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRequestStatusCollectionAPI.java Thu Aug 6 06:07:32 2015
@@ -63,7 +63,7 @@ public class TestRequestStatusCollection
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
- params.set(OverseerCollectionProcessor.REQUESTID, "1000");
+ params.set(OverseerCollectionMessageHandler.REQUESTID, "1000");
try {
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
@@ -76,7 +76,7 @@ public class TestRequestStatusCollection
// Check for a random (hopefully non-existent request id
params = new ModifiableSolrParams();
params.set(CollectionParams.ACTION, CollectionParams.CollectionAction.REQUESTSTATUS.toString());
- params.set(OverseerCollectionProcessor.REQUESTID, "9999999");
+ params.set(OverseerCollectionMessageHandler.REQUESTID, "9999999");
try {
r = sendRequest(params);
status = (NamedList) r.get("status");
@@ -101,7 +101,7 @@ public class TestRequestStatusCollection
// Check for the request to be completed.
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
- params.set(OverseerCollectionProcessor.REQUESTID, "1001");
+ params.set(OverseerCollectionMessageHandler.REQUESTID, "1001");
try {
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
} catch (SolrServerException | IOException e) {
@@ -128,7 +128,7 @@ public class TestRequestStatusCollection
params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.REQUESTSTATUS.toString());
- params.set(OverseerCollectionProcessor.REQUESTID, "1002");
+ params.set(OverseerCollectionMessageHandler.REQUESTID, "1002");
try {
message = sendStatusRequestWithRetry(params, MAX_WAIT_TIMEOUT_SECONDS);
Modified: lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java Thu Aug 6 06:07:32 2015
@@ -66,7 +66,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
import static org.apache.solr.common.util.Utils.makeMap;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Thu Aug 6 06:07:32 2015
@@ -81,9 +81,9 @@ import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
import static org.apache.solr.common.util.Utils.makeMap;
/**
@@ -1540,7 +1540,7 @@ public abstract class AbstractFullDistri
}
Integer replicationFactor = (Integer) collectionProps.get(ZkStateReader.REPLICATION_FACTOR);
if(replicationFactor==null){
- replicationFactor = (Integer) OverseerCollectionProcessor.COLL_PROPS.get(ZkStateReader.REPLICATION_FACTOR);
+ replicationFactor = (Integer) OverseerCollectionMessageHandler.COLL_PROPS.get(ZkStateReader.REPLICATION_FACTOR);
}
if (confSetName != null) {
Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java?rev=1694406&r1=1694405&r2=1694406&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java Thu Aug 6 06:07:32 2015
@@ -319,7 +319,7 @@ public class MiniSolrCloudCluster {
params.set("replicationFactor", replicationFactor);
params.set("collection.configName", configName);
if (null != createNodeSet) {
- params.set(OverseerCollectionProcessor.CREATE_NODE_SET, createNodeSet);
+ params.set(OverseerCollectionMessageHandler.CREATE_NODE_SET, createNodeSet);
}
if (null != asyncId) {
params.set(CommonAdminParams.ASYNC, asyncId);