You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:06:07 UTC
[46/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support
for Solr
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
deleted file mode 100644
index febeec0..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.Closeable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
-
-import com.codahale.metrics.Timer;
-import com.google.common.collect.ImmutableSet;
-import org.apache.commons.io.IOUtils;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.cloud.Overseer.LeaderStatus;
-import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.ID;
-
-/**
- * A generic processor run in the Overseer, used for handling items added
- * to a distributed work queue. Has support for handling exclusive tasks
- * (i.e. tasks that should not run in parallel with each other).
- *
- * An {@link OverseerMessageHandlerSelector} determines which
- * {@link OverseerMessageHandler} handles specific messages in the
- * queue.
- */
-public class OverseerTaskProcessor implements Runnable, Closeable {
-
- /**
- * Maximum number of overseer collection operations which can be
- * executed concurrently
- */
- public static final int MAX_PARALLEL_TASKS = 100;
- public static final int MAX_BLOCKED_TASKS = 1000;
-
- public ExecutorService tpe;
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private OverseerTaskQueue workQueue;
- private DistributedMap runningMap;
- private DistributedMap completedMap;
- private DistributedMap failureMap;
-
- // Set that maintains a list of all the tasks that are running. This is keyed on zk id of the task.
- final private Set<String> runningTasks;
-
- // List of completed tasks. This is used to clean up workQueue in zk.
- final private HashMap<String, QueueEvent> completedTasks;
-
- private String myId;
-
- private ZkStateReader zkStateReader;
-
- private boolean isClosed;
-
- private Stats stats;
-
- // Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
- // It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
- // deleted from the work-queue as that is a batched operation.
- final private Set<String> runningZKTasks;
- // This map may contain tasks which are read from work queue but could not
- // be executed because they are blocked or the execution queue is full
- // This is an optimization to ensure that we do not read the same tasks
- // again and again from ZK.
- final private Map<String, QueueEvent> blockedTasks = new LinkedHashMap<>();
- final private Predicate<String> excludedTasks = new Predicate<String>() {
- @Override
- public boolean test(String s) {
- return runningTasks.contains(s) || blockedTasks.containsKey(s);
- }
-
- @Override
- public String toString() {
- return StrUtils.join(ImmutableSet.of(runningTasks, blockedTasks.keySet()), ',');
- }
-
- };
-
- private final Object waitLock = new Object();
-
- protected OverseerMessageHandlerSelector selector;
-
- private OverseerNodePrioritizer prioritizer;
-
- private String thisNode;
-
- public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId,
- Stats stats,
- OverseerMessageHandlerSelector selector,
- OverseerNodePrioritizer prioritizer,
- OverseerTaskQueue workQueue,
- DistributedMap runningMap,
- DistributedMap completedMap,
- DistributedMap failureMap) {
- this.zkStateReader = zkStateReader;
- this.myId = myId;
- this.stats = stats;
- this.selector = selector;
- this.prioritizer = prioritizer;
- this.workQueue = workQueue;
- this.runningMap = runningMap;
- this.completedMap = completedMap;
- this.failureMap = failureMap;
- this.runningZKTasks = new HashSet<>();
- this.runningTasks = new HashSet<>();
- this.completedTasks = new HashMap<>();
- thisNode = Utils.getMDCNode();
- }
-
- @Override
- public void run() {
- MDCLoggingContext.setNode(thisNode);
- log.debug("Process current queue of overseer operations");
- LeaderStatus isLeader = amILeader();
- while (isLeader == LeaderStatus.DONT_KNOW) {
- log.debug("am_i_leader unclear {}", isLeader);
- isLeader = amILeader(); // not a no, not a yes, try ask again
- }
-
- String oldestItemInWorkQueue = null;
- // hasLeftOverItems - used for avoiding re-execution of async tasks that were processed by a previous Overseer.
- // This variable is set in case there's any task found on the workQueue when the OCP starts up and
- // the id for the queue tail is used as a marker to check for the task in completed/failed map in zk.
- // Beyond the marker, all tasks can safely be assumed to have never been executed.
- boolean hasLeftOverItems = true;
-
- try {
- oldestItemInWorkQueue = workQueue.getTailId();
- } catch (KeeperException e) {
- // We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
- // async calls.
- SolrException.log(log, "", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- if (oldestItemInWorkQueue == null)
- hasLeftOverItems = false;
- else
- log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
-
- try {
- prioritizer.prioritizeOverseerNodes(myId);
- } catch (Exception e) {
- if (!zkStateReader.getZkClient().isClosed()) {
- log.error("Unable to prioritize overseer ", e);
- }
- }
-
- // TODO: Make maxThreads configurable.
-
- this.tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, MAX_PARALLEL_TASKS, 0L, TimeUnit.MILLISECONDS,
- new SynchronousQueue<Runnable>(),
- new DefaultSolrThreadFactory("OverseerThreadFactory"));
- try {
- while (!this.isClosed) {
- try {
- isLeader = amILeader();
- if (LeaderStatus.NO == isLeader) {
- break;
- } else if (LeaderStatus.YES != isLeader) {
- log.debug("am_i_leader unclear {}", isLeader);
- continue; // not a no, not a yes, try asking again
- }
-
- log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
- cleanUpWorkQueue();
-
- printTrackingMaps();
-
- boolean waited = false;
-
- while (runningTasks.size() > MAX_PARALLEL_TASKS) {
- synchronized (waitLock) {
- waitLock.wait(100);//wait for 100 ms or till a task is complete
- }
- waited = true;
- }
-
- if (waited)
- cleanUpWorkQueue();
-
-
- ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
- heads.addAll(blockedTasks.values());
-
- //If we have enough items in the blocked tasks already, it makes
- // no sense to read more items from the work queue. it makes sense
- // to clear out at least a few items in the queue before we read more items
- if (heads.size() < MAX_BLOCKED_TASKS) {
- //instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
- int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
- List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
- log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
- heads.addAll(newTasks);
- } else {
- // Prevent free-spinning this loop.
- Thread.sleep(1000);
- }
-
- if (isClosed) break;
-
- if (heads.isEmpty()) {
- continue;
- }
-
- blockedTasks.clear(); // clear it now; may get refilled below.
-
- taskBatch.batchId++;
- boolean tooManyTasks = false;
- for (QueueEvent head : heads) {
- if (!tooManyTasks) {
- synchronized (runningTasks) {
- tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
- }
- }
- if (tooManyTasks) {
- // Too many tasks are running, just shove the rest into the "blocked" queue.
- if(blockedTasks.size() < MAX_BLOCKED_TASKS)
- blockedTasks.put(head.getId(), head);
- continue;
- }
- if (runningZKTasks.contains(head.getId())) continue;
- final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
- final String asyncId = message.getStr(ASYNC);
- if (hasLeftOverItems) {
- if (head.getId().equals(oldestItemInWorkQueue))
- hasLeftOverItems = false;
- if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
- log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]",asyncId );
- workQueue.remove(head);
- continue;
- }
- }
- String operation = message.getStr(Overseer.QUEUE_OPERATION);
- if (operation == null) {
- log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
- workQueue.remove(head);
- continue;
- }
- OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
- OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
- if (lock == null) {
- log.debug("Exclusivity check failed for [{}]", message.toString());
- //we may end crossing the size of the MAX_BLOCKED_TASKS. They are fine
- if (blockedTasks.size() < MAX_BLOCKED_TASKS)
- blockedTasks.put(head.getId(), head);
- continue;
- }
- try {
- markTaskAsRunning(head, asyncId);
- log.debug("Marked task [{}] as running", head.getId());
- } catch (KeeperException.NodeExistsException e) {
- lock.unlock();
- // This should never happen
- log.error("Tried to pick up task [{}] when it was already running!", head.getId());
- continue;
- } catch (InterruptedException e) {
- lock.unlock();
- log.error("Thread interrupted while trying to pick task for execution.", head.getId());
- Thread.currentThread().interrupt();
- continue;
- }
- log.debug(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString());
- Runner runner = new Runner(messageHandler, message,
- operation, head, lock);
- tpe.execute(runner);
- }
-
- } catch (KeeperException e) {
- if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
- log.warn("Overseer cannot talk to ZK");
- return;
- }
- SolrException.log(log, "", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- } catch (Exception e) {
- SolrException.log(log, "", e);
- }
- }
- } finally {
- this.close();
- }
- }
-
- private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
- synchronized (completedTasks) {
- for (String id : completedTasks.keySet()) {
- workQueue.remove(completedTasks.get(id));
- runningZKTasks.remove(id);
- }
- completedTasks.clear();
- }
- }
-
- public void close() {
- isClosed = true;
- if (tpe != null) {
- if (!tpe.isShutdown()) {
- ExecutorUtil.shutdownAndAwaitTermination(tpe);
- }
- }
- IOUtils.closeQuietly(selector);
- }
-
- public static List<String> getSortedOverseerNodeNames(SolrZkClient zk) throws KeeperException, InterruptedException {
- List<String> children = null;
- try {
- children = zk.getChildren(Overseer.OVERSEER_ELECT + LeaderElector.ELECTION_NODE, null, true);
- } catch (Exception e) {
- log.warn("error ", e);
- return new ArrayList<>();
- }
- LeaderElector.sortSeqs(children);
- ArrayList<String> nodeNames = new ArrayList<>(children.size());
- for (String c : children) nodeNames.add(LeaderElector.getNodeName(c));
- return nodeNames;
- }
-
- public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
- List<String> children = null;
- try {
- children = zk.getChildren(path, null, true);
- LeaderElector.sortSeqs(children);
- return children;
- } catch (Exception e) {
- throw e;
- }
-
- }
-
- public static String getLeaderNode(SolrZkClient zkClient) throws KeeperException, InterruptedException {
- String id = getLeaderId(zkClient);
- return id==null ?
- null:
- LeaderElector.getNodeName( id);
- }
-
- public static String getLeaderId(SolrZkClient zkClient) throws KeeperException,InterruptedException{
- byte[] data = null;
- try {
- data = zkClient.getData(Overseer.OVERSEER_ELECT + "/leader", null, new Stat(), true);
- } catch (KeeperException.NoNodeException e) {
- return null;
- }
- Map m = (Map) Utils.fromJSON(data);
- return (String) m.get(ID);
- }
-
- protected LeaderStatus amILeader() {
- String statsName = "collection_am_i_leader";
- Timer.Context timerContext = stats.time(statsName);
- boolean success = true;
- String propsId = null;
- try {
- ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(
- Overseer.OVERSEER_ELECT + "/leader", null, null, true));
- propsId = props.getStr(ID);
- if (myId.equals(propsId)) {
- return LeaderStatus.YES;
- }
- } catch (KeeperException e) {
- success = false;
- if (e.code() == KeeperException.Code.CONNECTIONLOSS) {
- log.error("", e);
- return LeaderStatus.DONT_KNOW;
- } else if (e.code() != KeeperException.Code.SESSIONEXPIRED) {
- log.warn("", e);
- } else {
- log.debug("", e);
- }
- } catch (InterruptedException e) {
- success = false;
- Thread.currentThread().interrupt();
- } finally {
- timerContext.stop();
- if (success) {
- stats.success(statsName);
- } else {
- stats.error(statsName);
- }
- }
- log.info("According to ZK I (id={}) am no longer a leader. propsId={}", myId, propsId);
- return LeaderStatus.NO;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-
- @SuppressWarnings("unchecked")
- private void markTaskAsRunning(QueueEvent head, String asyncId)
- throws KeeperException, InterruptedException {
- synchronized (runningZKTasks) {
- runningZKTasks.add(head.getId());
- }
-
- synchronized (runningTasks) {
- runningTasks.add(head.getId());
- }
-
-
- if (asyncId != null)
- runningMap.put(asyncId, null);
- }
-
- protected class Runner implements Runnable {
- ZkNodeProps message;
- String operation;
- SolrResponse response;
- QueueEvent head;
- OverseerMessageHandler messageHandler;
- private final OverseerMessageHandler.Lock lock;
-
- public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head, OverseerMessageHandler.Lock lock) {
- this.message = message;
- this.operation = operation;
- this.head = head;
- this.messageHandler = messageHandler;
- this.lock = lock;
- response = null;
- }
-
-
- public void run() {
- String statsName = messageHandler.getTimerName(operation);
- final Timer.Context timerContext = stats.time(statsName);
-
- boolean success = false;
- final String asyncId = message.getStr(ASYNC);
- String taskKey = messageHandler.getTaskKey(message);
-
- try {
- try {
- log.debug("Runner processing {}", head.getId());
- response = messageHandler.processMessage(message, operation);
- } finally {
- timerContext.stop();
- updateStats(statsName);
- }
-
- if (asyncId != null) {
- if (response != null && (response.getResponse().get("failure") != null
- || response.getResponse().get("exception") != null)) {
- failureMap.put(asyncId, SolrResponse.serializable(response));
- log.debug("Updated failed map for task with zkid:[{}]", head.getId());
- } else {
- completedMap.put(asyncId, SolrResponse.serializable(response));
- log.debug("Updated completed map for task with zkid:[{}]", head.getId());
- }
- } else {
- head.setBytes(SolrResponse.serializable(response));
- log.debug("Completed task:[{}]", head.getId());
- }
-
- markTaskComplete(head.getId(), asyncId);
- log.debug("Marked task [{}] as completed.", head.getId());
- printTrackingMaps();
-
- log.debug(messageHandler.getName() + ": Message id:" + head.getId() +
- " complete, response:" + response.getResponse().toString());
- success = true;
- } catch (KeeperException e) {
- SolrException.log(log, "", e);
- } catch (InterruptedException e) {
- // Reset task from tracking data structures so that it can be retried.
- resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
- log.warn("Resetting task {} as the thread was interrupted.", head.getId());
- Thread.currentThread().interrupt();
- } finally {
- lock.unlock();
- if (!success) {
- // Reset task from tracking data structures so that it can be retried.
- resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
- }
- synchronized (waitLock){
- waitLock.notifyAll();
- }
- }
- }
-
- private void markTaskComplete(String id, String asyncId)
- throws KeeperException, InterruptedException {
- synchronized (completedTasks) {
- completedTasks.put(id, head);
- }
-
- synchronized (runningTasks) {
- runningTasks.remove(id);
- }
-
- if (asyncId != null) {
- if (!runningMap.remove(asyncId)) {
- log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
- }
- }
-
- workQueue.remove(head);
- }
-
- private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
- log.warn("Resetting task: {}, requestid: {}, taskKey: {}", id, asyncId, taskKey);
- try {
- if (asyncId != null) {
- if (!runningMap.remove(asyncId)) {
- log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
- }
- }
-
- synchronized (runningTasks) {
- runningTasks.remove(id);
- }
-
- } catch (KeeperException e) {
- SolrException.log(log, "", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- }
-
- private void updateStats(String statsName) {
- if (isSuccessful()) {
- stats.success(statsName);
- } else {
- stats.error(statsName);
- stats.storeFailureDetails(statsName, message, response);
- }
- }
-
- private boolean isSuccessful() {
- if (response == null)
- return false;
- return !(response.getResponse().get("failure") != null || response.getResponse().get("exception") != null);
- }
- }
-
- private void printTrackingMaps() {
- if (log.isDebugEnabled()) {
- synchronized (runningTasks) {
- log.debug("RunningTasks: {}", runningTasks.toString());
- }
- log.debug("BlockedTasks: {}", blockedTasks.keySet().toString());
- synchronized (completedTasks) {
- log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
- }
- synchronized (runningZKTasks) {
- log.debug("RunningZKTasks: {}", runningZKTasks.toString());
- }
- }
- }
-
-
-
- String getId(){
- return myId;
- }
-
- /**
- * An interface to determine which {@link OverseerMessageHandler}
- * handles a given message. This could be a single OverseerMessageHandler
- * for the case where a single type of message is handled (e.g. collection
- * messages only) , or a different handler could be selected based on the
- * contents of the message.
- */
- public interface OverseerMessageHandlerSelector extends Closeable {
- OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
- }
-
- final private TaskBatch taskBatch = new TaskBatch();
-
- public class TaskBatch {
- private long batchId = 0;
-
- public long getId() {
- return batchId;
- }
-
- public int getRunningTasks() {
- synchronized (runningTasks) {
- return runningTasks.size();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
deleted file mode 100644
index 66a31c5..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import com.codahale.metrics.Timer;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Predicate;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.util.Pair;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link ZkDistributedQueue} augmented with helper methods specific to the overseer task queues.
- * Methods specific to this subclass ignore superclass internal state and hit ZK directly.
- * This is inefficient! But the API on this class is kind of muddy..
- */
-public class OverseerTaskQueue extends ZkDistributedQueue {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private static final String RESPONSE_PREFIX = "qnr-" ;
-
- public OverseerTaskQueue(SolrZkClient zookeeper, String dir) {
- this(zookeeper, dir, new Stats());
- }
-
- public OverseerTaskQueue(SolrZkClient zookeeper, String dir, Stats stats) {
- super(zookeeper, dir, stats);
- }
-
- /**
- * Returns true if the queue contains a task with the specified async id.
- */
- public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
- throws KeeperException, InterruptedException {
-
- List<String> childNames = zookeeper.getChildren(dir, null, true);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- if (childName != null && childName.startsWith(PREFIX)) {
- try {
- byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
- if (data != null) {
- ZkNodeProps message = ZkNodeProps.load(data);
- if (message.containsKey(requestIdKey)) {
- log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
- if(message.get(requestIdKey).equals(requestId)) return true;
- }
- }
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- }
- }
-
- return false;
- }
-
- /**
- * Remove the event and save the response into the other path.
- */
- public void remove(QueueEvent event) throws KeeperException,
- InterruptedException {
- Timer.Context time = stats.time(dir + "_remove_event");
- try {
- String path = event.getId();
- String responsePath = dir + "/" + RESPONSE_PREFIX
- + path.substring(path.lastIndexOf("-") + 1);
- if (zookeeper.exists(responsePath, true)) {
- zookeeper.setData(responsePath, event.getBytes(), true);
- } else {
- log.info("Response ZK path: " + responsePath + " doesn't exist."
- + " Requestor may have disconnected from ZooKeeper");
- }
- try {
- zookeeper.delete(path, -1, true);
- } catch (KeeperException.NoNodeException ignored) {
- }
- } finally {
- time.stop();
- }
- }
-
- /**
- * Watcher that blocks until a WatchedEvent occurs for a znode.
- */
- static final class LatchWatcher implements Watcher {
-
- private final Lock lock;
- private final Condition eventReceived;
- private WatchedEvent event;
- private Event.EventType latchEventType;
-
- LatchWatcher() {
- this(null);
- }
-
- LatchWatcher(Event.EventType eventType) {
- this.lock = new ReentrantLock();
- this.eventReceived = lock.newCondition();
- this.latchEventType = eventType;
- }
-
-
- @Override
- public void process(WatchedEvent event) {
- // session events are not change events, and do not remove the watcher
- if (Event.EventType.None.equals(event.getType())) {
- return;
- }
- // If latchEventType is not null, only fire if the type matches
- log.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState(), latchEventType);
- if (latchEventType == null || event.getType() == latchEventType) {
- lock.lock();
- try {
- this.event = event;
- eventReceived.signalAll();
- } finally {
- lock.unlock();
- }
- }
- }
-
- public void await(long timeoutMs) throws InterruptedException {
- assert timeoutMs > 0;
- lock.lock();
- try {
- if (this.event != null) {
- return;
- }
- eventReceived.await(timeoutMs, TimeUnit.MILLISECONDS);
- } finally {
- lock.unlock();
- }
- }
-
- public WatchedEvent getWatchedEvent() {
- return event;
- }
- }
-
- /**
- * Inserts data into zookeeper.
- *
- * @return true if data was successfully added
- */
- private String createData(String path, byte[] data, CreateMode mode)
- throws KeeperException, InterruptedException {
- for (;;) {
- try {
- return zookeeper.create(path, data, mode, true);
- } catch (KeeperException.NoNodeException e) {
- try {
- zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
- } catch (KeeperException.NodeExistsException ne) {
- // someone created it
- }
- }
- }
- }
-
- /**
- * Offer the data and wait for the response
- *
- */
- public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
- InterruptedException {
- Timer.Context time = stats.time(dir + "_offer");
- try {
- // Create and watch the response node before creating the request node;
- // otherwise we may miss the response.
- String watchID = createResponseNode();
-
- LatchWatcher watcher = new LatchWatcher();
- Stat stat = zookeeper.exists(watchID, watcher, true);
-
- // create the request node
- createRequestNode(data, watchID);
-
- if (stat != null) {
- watcher.await(timeout);
- }
- byte[] bytes = zookeeper.getData(watchID, null, null, true);
- // create the event before deleting the node, otherwise we can get the deleted
- // event from the watcher.
- QueueEvent event = new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
- zookeeper.delete(watchID, -1, true);
- return event;
- } finally {
- time.stop();
- }
- }
-
- void createRequestNode(byte[] data, String watchID) throws KeeperException, InterruptedException {
- createData(dir + "/" + PREFIX + watchID.substring(watchID.lastIndexOf("-") + 1),
- data, CreateMode.PERSISTENT);
- }
-
- String createResponseNode() throws KeeperException, InterruptedException {
- return createData(
- dir + "/" + RESPONSE_PREFIX,
- null, CreateMode.EPHEMERAL_SEQUENTIAL);
- }
-
-
- public List<QueueEvent> peekTopN(int n, Predicate<String> excludeSet, long waitMillis)
- throws KeeperException, InterruptedException {
- ArrayList<QueueEvent> topN = new ArrayList<>();
-
- log.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
- Timer.Context time;
- if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
- else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
-
- try {
- for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.test(dir + "/" + child))) {
- topN.add(new QueueEvent(dir + "/" + element.first(),
- element.second(), null));
- }
- printQueueEventsListElementIds(topN);
- return topN;
- } finally {
- time.stop();
- }
- }
-
- private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
- if (log.isDebugEnabled() && !topN.isEmpty()) {
- StringBuilder sb = new StringBuilder("[");
- for (QueueEvent queueEvent : topN) {
- sb.append(queueEvent.getId()).append(", ");
- }
- sb.append("]");
- log.debug("Returning topN elements: {}", sb.toString());
- }
- }
-
-
- /**
- *
- * Gets last element of the Queue without removing it.
- */
- public String getTailId() throws KeeperException, InterruptedException {
- // TODO: could we use getChildren here? Unsure what freshness guarantee the caller needs.
- TreeSet<String> orderedChildren = fetchZkChildren(null);
-
- for (String headNode : orderedChildren.descendingSet())
- if (headNode != null) {
- try {
- QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
- null, null, true), null);
- return queueEvent.getId();
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- }
- return null;
- }
-
- public static class QueueEvent {
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((id == null) ? 0 : id.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- QueueEvent other = (QueueEvent) obj;
- if (id == null) {
- if (other.id != null) return false;
- } else if (!id.equals(other.id)) return false;
- return true;
- }
-
- private WatchedEvent event = null;
- private String id;
- private byte[] bytes;
-
- QueueEvent(String id, byte[] bytes, WatchedEvent event) {
- this.id = id;
- this.bytes = bytes;
- this.event = event;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getId() {
- return id;
- }
-
- public void setBytes(byte[] bytes) {
- this.bytes = bytes;
- }
-
- public byte[] getBytes() {
- return bytes;
- }
-
- public WatchedEvent getWatchedEvent() {
- return event;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
deleted file mode 100644
index 007d221..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.SolrCore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Start recovery of a core if its term is less than leader's term
- */
-public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final CoreDescriptor coreDescriptor;
- private final CoreContainer coreContainer;
- // used to prevent the case when term of other replicas get changed, we redo recovery
- // the idea here is with a specific term of a replica, we only do recovery one
- private final AtomicLong lastTermDoRecovery;
-
- RecoveringCoreTermWatcher(CoreDescriptor coreDescriptor, CoreContainer coreContainer) {
- this.coreDescriptor = coreDescriptor;
- this.coreContainer = coreContainer;
- this.lastTermDoRecovery = new AtomicLong(-1);
- }
-
- @Override
- public boolean onTermChanged(ZkShardTerms.Terms terms) {
- if (coreContainer.isShutDown()) return false;
-
- try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) {
- if (solrCore == null || solrCore.isClosed()) {
- return false;
- }
-
- if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
- String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
- if (terms.haveHighestTermValue(coreNodeName)) return true;
- if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
- log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
- lastTermDoRecovery.set(terms.getTerm(coreNodeName));
- solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
- }
- } catch (Exception e) {
- log.info("Failed to watch term of core {}", coreDescriptor.getName(), e);
- return false;
- }
-
- return true;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
-
- return coreDescriptor.getName().equals(that.coreDescriptor.getName());
- }
-
- @Override
- public int hashCode() {
- return coreDescriptor.getName().hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
deleted file mode 100644
index 94e126e..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ /dev/null
@@ -1,873 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.lucene.search.MatchAllDocsQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.SolrPingResponse;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.DirectoryFactory.DirContext;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler;
-import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestHandler;
-import org.apache.solr.search.SolrIndexSearcher;
-import org.apache.solr.update.CdcrUpdateLog;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.PeerSyncWithLeader;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.util.RefCounted;
-import org.apache.solr.util.SolrPluginUtils;
-import org.apache.solr.util.plugin.NamedListInitializedPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class may change in future and customisations are not supported
- * between versions in terms of API or back compat behaviour.
- * @lucene.experimental
- */
-public class RecoveryStrategy implements Runnable, Closeable {
-
- public static class Builder implements NamedListInitializedPlugin {
- private NamedList args;
- @Override
- public void init(NamedList args) {
- this.args = args;
- }
- // this should only be used from SolrCoreState
- public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd,
- RecoveryStrategy.RecoveryListener recoveryListener) {
- final RecoveryStrategy recoveryStrategy = newRecoveryStrategy(cc, cd, recoveryListener);
- SolrPluginUtils.invokeSetters(recoveryStrategy, args);
- return recoveryStrategy;
- }
- protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
- RecoveryStrategy.RecoveryListener recoveryListener) {
- return new RecoveryStrategy(cc, cd, recoveryListener);
- }
- }
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
- private int maxRetries = 500;
- private int startingRecoveryDelayMilliSeconds = 5000;
-
- public static interface RecoveryListener {
- public void recovered();
- public void failed();
- }
-
- private volatile boolean close = false;
-
- private RecoveryListener recoveryListener;
- private ZkController zkController;
- private String baseUrl;
- private String coreZkNodeName;
- private ZkStateReader zkStateReader;
- private volatile String coreName;
- private int retries;
- private boolean recoveringAfterStartup;
- private CoreContainer cc;
- private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
- private final Replica.Type replicaType;
-
- protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
- this.cc = cc;
- this.coreName = cd.getName();
- this.recoveryListener = recoveryListener;
- zkController = cc.getZkController();
- zkStateReader = zkController.getZkStateReader();
- baseUrl = zkController.getBaseUrl();
- coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
- replicaType = cd.getCloudDescriptor().getReplicaType();
- }
-
- final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
- return waitForUpdatesWithStaleStatePauseMilliSeconds;
- }
-
- final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) {
- this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds;
- }
-
- final public int getMaxRetries() {
- return maxRetries;
- }
-
- final public void setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
- }
-
- final public int getStartingRecoveryDelayMilliSeconds() {
- return startingRecoveryDelayMilliSeconds;
- }
-
- final public void setStartingRecoveryDelayMilliSeconds(int startingRecoveryDelayMilliSeconds) {
- this.startingRecoveryDelayMilliSeconds = startingRecoveryDelayMilliSeconds;
- }
-
- final public boolean getRecoveringAfterStartup() {
- return recoveringAfterStartup;
- }
-
- final public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
- this.recoveringAfterStartup = recoveringAfterStartup;
- }
-
- // make sure any threads stop retrying
- @Override
- final public void close() {
- close = true;
- if (prevSendPreRecoveryHttpUriRequest != null) {
- prevSendPreRecoveryHttpUriRequest.abort();
- }
- log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
- }
-
- final private void recoveryFailed(final SolrCore core,
- final ZkController zkController, final String baseUrl,
- final String shardZkNodeName, final CoreDescriptor cd) throws Exception {
- SolrException.log(log, "Recovery failed - I give up.");
- try {
- zkController.publish(cd, Replica.State.RECOVERY_FAILED);
- } finally {
- close();
- recoveryListener.failed();
- }
- }
-
- /**
- * This method may change in future and customisations are not supported
- * between versions in terms of API or back compat behaviour.
- * @lucene.experimental
- */
- protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
- return new ZkCoreNodeProps(leaderprops).getCoreUrl();
- }
-
- final private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
- throws SolrServerException, IOException {
-
- final String leaderUrl = getReplicateLeaderUrl(leaderprops);
-
- log.info("Attempting to replicate from [{}].", leaderUrl);
-
- // send commit
- commitOnLeader(leaderUrl);
-
- // use rep handler directly, so we can do this sync rather than async
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-
- if (replicationHandler == null) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
- }
-
- ModifiableSolrParams solrParams = new ModifiableSolrParams();
- solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
- solrParams.set(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, replicaType == Replica.Type.TLOG);
- // always download the tlogs from the leader when running with cdcr enabled. We need to have all the tlogs
- // to ensure leader failover doesn't cause missing docs on the target
- if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
- solrParams.set(ReplicationHandler.TLOG_FILES, true);
- }
-
- if (isClosed()) return; // we check closed on return
- boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();
-
- if (!success) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
- }
-
- // solrcloud_debug
- if (log.isDebugEnabled()) {
- try {
- RefCounted<SolrIndexSearcher> searchHolder = core
- .getNewestSearcher(false);
- SolrIndexSearcher searcher = searchHolder.get();
- Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
- try {
- log.debug(core.getCoreContainer()
- .getZkController().getNodeName()
- + " replicated "
- + searcher.count(new MatchAllDocsQuery())
- + " from "
- + leaderUrl
- + " gen:"
- + (core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration())
- + " data:" + core.getDataDir()
- + " index:" + core.getIndexDir()
- + " newIndex:" + core.getNewIndexDir()
- + " files:" + Arrays.asList(dir.listAll()));
- } finally {
- core.getDirectoryFactory().release(dir);
- searchHolder.decref();
- }
- } catch (Exception e) {
- log.debug("Error in solrcloud_debug block", e);
- }
- }
-
- }
-
- final private void commitOnLeader(String leaderUrl) throws SolrServerException,
- IOException {
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
- .withConnectionTimeout(30000)
- .build()) {
- UpdateRequest ureq = new UpdateRequest();
- ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-// ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if "onlyLeaderIndexes"?
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
- ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
- client);
- }
- }
-
- @Override
- final public void run() {
-
- // set request info for logging
- try (SolrCore core = cc.getCore(coreName)) {
-
- if (core == null) {
- SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
- return;
- }
- MDCLoggingContext.setCore(core);
-
- log.info("Starting recovery process. recoveringAfterStartup=" + recoveringAfterStartup);
-
- try {
- doRecovery(core);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- SolrException.log(log, "", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- } catch (Exception e) {
- log.error("", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
- }
- } finally {
- MDCLoggingContext.clear();
- }
- }
-
- final public void doRecovery(SolrCore core) throws Exception {
- if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
- doSyncOrReplicateRecovery(core);
- } else {
- doReplicateOnlyRecovery(core);
- }
- }
-
- final private void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
- boolean successfulRecovery = false;
-
-// if (core.getUpdateHandler().getUpdateLog() != null) {
-// SolrException.log(log, "'replicate-only' recovery strategy should only be used if no update logs are present, but this core has one: "
-// + core.getUpdateHandler().getUpdateLog());
-// return;
-// }
- while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
- try {
- CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId());
- final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
- final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
- String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
- String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
-
- boolean isLeader = leaderUrl.equals(ourUrl); //TODO: We can probably delete most of this code if we say this strategy can only be used for pull replicas
- if (isLeader && !cloudDesc.isLeader()) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
- }
- if (cloudDesc.isLeader()) {
- assert cloudDesc.getReplicaType() != Replica.Type.PULL;
- // we are now the leader - no one else must have been suitable
- log.warn("We have not yet recovered - but we are now the leader!");
- log.info("Finished recovery process.");
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- return;
- }
-
-
- log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
- ourUrl);
- zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-
- if (isClosed()) {
- log.info("Recovery for core {} has been closed", core.getName());
- break;
- }
- log.info("Starting Replication Recovery.");
-
- try {
- log.info("Stopping background replicate from leader process");
- zkController.stopReplicationFromLeader(coreName);
- replicate(zkController.getNodeName(), core, leaderprops);
-
- if (isClosed()) {
- log.info("Recovery for core {} has been closed", core.getName());
- break;
- }
-
- log.info("Replication Recovery was successful.");
- successfulRecovery = true;
- } catch (Exception e) {
- SolrException.log(log, "Error while trying to recover", e);
- }
-
- } catch (Exception e) {
- SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
- } finally {
- if (successfulRecovery) {
- log.info("Restaring background replicate from leader process");
- zkController.startReplicationFromLeader(coreName, false);
- log.info("Registering as Active after recovery.");
- try {
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- } catch (Exception e) {
- log.error("Could not publish as ACTIVE after succesful recovery", e);
- successfulRecovery = false;
- }
-
- if (successfulRecovery) {
- close = true;
- recoveryListener.recovered();
- }
- }
- }
-
- if (!successfulRecovery) {
- // lets pause for a moment and we need to try again...
- // TODO: we don't want to retry for some problems?
- // Or do a fall off retry...
- try {
-
- if (isClosed()) {
- log.info("Recovery for core {} has been closed", core.getName());
- break;
- }
-
- log.error("Recovery failed - trying again... (" + retries + ")");
-
- retries++;
- if (retries >= maxRetries) {
- SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
- try {
- recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
- } catch (Exception e) {
- SolrException.log(log, "Could not publish that recovery failed", e);
- }
- break;
- }
- } catch (Exception e) {
- SolrException.log(log, "An error has occurred during recovery", e);
- }
-
- try {
- // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
- // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
- // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
- // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
- int loopCount = retries < 4 ? (int) Math.min(Math.pow(2, retries), 12) : 12;
- log.info("Wait [{}] seconds before trying to recover again (attempt={})",
- TimeUnit.MILLISECONDS.toSeconds(loopCount * startingRecoveryDelayMilliSeconds), retries);
- for (int i = 0; i < loopCount; i++) {
- if (isClosed()) {
- log.info("Recovery for core {} has been closed", core.getName());
- break; // check if someone closed us
- }
- Thread.sleep(startingRecoveryDelayMilliSeconds);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Recovery was interrupted.", e);
- close = true;
- }
- }
-
- }
- // We skip core.seedVersionBuckets(); We don't have a transaction log
- log.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
- }
-
- // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
- public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
- boolean successfulRecovery = false;
-
- UpdateLog ulog;
- ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog == null) {
- SolrException.log(log, "No UpdateLog found - cannot recover.");
- recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
- core.getCoreDescriptor());
- return;
- }
-
- // we temporary ignore peersync for tlog replicas
- boolean firstTime = replicaType != Replica.Type.TLOG;
-
- List<Long> recentVersions;
- try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
- recentVersions = recentUpdates.getVersions(ulog.getNumRecordsToKeep());
- } catch (Exception e) {
- SolrException.log(log, "Corrupt tlog - ignoring.", e);
- recentVersions = new ArrayList<>(0);
- }
-
- List<Long> startingVersions = ulog.getStartingVersions();
-
- if (startingVersions != null && recoveringAfterStartup) {
- try {
- int oldIdx = 0; // index of the start of the old list in the current list
- long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-
- for (; oldIdx < recentVersions.size(); oldIdx++) {
- if (recentVersions.get(oldIdx) == firstStartingVersion) break;
- }
-
- if (oldIdx > 0) {
- log.info("Found new versions added after startup: num=[{}]", oldIdx);
- log.info("currentVersions size={} range=[{} to {}]", recentVersions.size(), recentVersions.get(0), recentVersions.get(recentVersions.size()-1));
- }
-
- if (startingVersions.isEmpty()) {
- log.info("startupVersions is empty");
- } else {
- log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0), startingVersions.get(startingVersions.size()-1));
- }
- } catch (Exception e) {
- SolrException.log(log, "Error getting recent versions.", e);
- recentVersions = new ArrayList<>(0);
- }
- }
-
- if (recoveringAfterStartup) {
- // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
- // when we went down. We may have received updates since then.
- recentVersions = startingVersions;
- try {
- if (ulog.existOldBufferLog()) {
- // this means we were previously doing a full index replication
- // that probably didn't complete and buffering updates in the
- // meantime.
- log.info("Looks like a previous replication recovery did not complete - skipping peer sync.");
- firstTime = false; // skip peersync
- }
- } catch (Exception e) {
- SolrException.log(log, "Error trying to get ulog starting operation.", e);
- firstTime = false; // skip peersync
- }
- }
-
- if (replicaType == Replica.Type.TLOG) {
- zkController.stopReplicationFromLeader(coreName);
- }
-
- final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
- Future<RecoveryInfo> replayFuture = null;
- while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
- try {
- CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- boolean isLeader = leader.getCoreUrl().equals(ourUrl);
- if (isLeader && !cloudDesc.isLeader()) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
- }
- if (cloudDesc.isLeader()) {
- // we are now the leader - no one else must have been suitable
- log.warn("We have not yet recovered - but we are now the leader!");
- log.info("Finished recovery process.");
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- return;
- }
-
- log.info("Begin buffering updates. core=[{}]", coreName);
- // recalling buffer updates will drop the old buffer tlog
- ulog.bufferUpdates();
-
- log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
- ourUrl);
- zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-
-
- final Slice slice = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName())
- .getSlice(cloudDesc.getShardId());
-
- try {
- prevSendPreRecoveryHttpUriRequest.abort();
- } catch (NullPointerException e) {
- // okay
- }
-
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
-
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- // we wait a bit so that any updates on the leader
- // that started before they saw recovering state
- // are sure to have finished (see SOLR-7141 for
- // discussion around current value)
- //TODO since SOLR-11216, we probably won't need this
- try {
- Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- // first thing we just try to sync
- if (firstTime) {
- firstTime = false; // only try sync the first time through the loop
- log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
- // System.out.println("Attempting to PeerSync from " + leaderUrl
- // + " i am:" + zkController.getNodeName());
- PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core,
- leader.getCoreUrl(), ulog.getNumRecordsToKeep());
- boolean syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
- if (syncSuccess) {
- SolrQueryRequest req = new LocalSolrQueryRequest(core,
- new ModifiableSolrParams());
- // force open a new searcher
- core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
- req.close();
- log.info("PeerSync stage of recovery was successful.");
-
- // solrcloud_debug
- cloudDebugLog(core, "synced");
-
- log.info("Replaying updates buffered during PeerSync.");
- replay(core);
-
- // sync success
- successfulRecovery = true;
- return;
- }
-
- log.info("PeerSync Recovery was not successful - trying replication.");
- }
-
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- log.info("Starting Replication Recovery.");
-
- try {
-
- replicate(zkController.getNodeName(), core, leader);
-
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- replayFuture = replay(core);
-
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- log.info("Replication Recovery was successful.");
- successfulRecovery = true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Recovery was interrupted", e);
- close = true;
- } catch (Exception e) {
- SolrException.log(log, "Error while trying to recover", e);
- }
-
- } catch (Exception e) {
- SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
- } finally {
- if (successfulRecovery) {
- log.info("Registering as Active after recovery.");
- try {
- if (replicaType == Replica.Type.TLOG) {
- zkController.startReplicationFromLeader(coreName, true);
- }
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
- } catch (Exception e) {
- log.error("Could not publish as ACTIVE after succesful recovery", e);
- successfulRecovery = false;
- }
-
- if (successfulRecovery) {
- close = true;
- recoveryListener.recovered();
- }
- }
- }
-
- if (!successfulRecovery) {
- // lets pause for a moment and we need to try again...
- // TODO: we don't want to retry for some problems?
- // Or do a fall off retry...
- try {
-
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break;
- }
-
- log.error("Recovery failed - trying again... (" + retries + ")");
-
- retries++;
- if (retries >= maxRetries) {
- SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
- try {
- recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
- } catch (Exception e) {
- SolrException.log(log, "Could not publish that recovery failed", e);
- }
- break;
- }
- } catch (Exception e) {
- SolrException.log(log, "An error has occurred during recovery", e);
- }
-
- try {
- // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
- // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
- // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
- // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
- double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
- log.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
- for (int i = 0; i < loopCount; i++) {
- if (isClosed()) {
- log.info("RecoveryStrategy has been closed");
- break; // check if someone closed us
- }
- Thread.sleep(startingRecoveryDelayMilliSeconds);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("Recovery was interrupted.", e);
- close = true;
- }
- }
-
- }
-
- // if replay was skipped (possibly to due pulling a full index from the leader),
- // then we still need to update version bucket seeds after recovery
- if (successfulRecovery && replayFuture == null) {
- log.info("Updating version bucket highest from index after successful recovery.");
- core.seedVersionBuckets();
- }
-
- log.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
- }
-
- private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
- int numTried = 0;
- while (true) {
- CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
- DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
- if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
- docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
- // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
- zkController.publish(coreDesc, Replica.State.DOWN);
- }
- numTried++;
- Replica leaderReplica = null;
-
- if (isClosed()) {
- return leaderReplica;
- }
-
- try {
- leaderReplica = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId());
- } catch (SolrException e) {
- Thread.sleep(500);
- continue;
- }
-
- if (leaderReplica.getCoreUrl().equals(ourUrl)) {
- return leaderReplica;
- }
-
- try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
- .withSocketTimeout(1000)
- .withConnectionTimeout(1000)
- .build()) {
- SolrPingResponse resp = httpSolrClient.ping();
- return leaderReplica;
- } catch (IOException e) {
- log.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl());
- Thread.sleep(500);
- } catch (Exception e) {
- if (e.getCause() instanceof IOException) {
- log.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl());
- Thread.sleep(500);
- } else {
- return leaderReplica;
- }
- }
- }
- }
-
- public static Runnable testing_beforeReplayBufferingUpdates;
-
- final private Future<RecoveryInfo> replay(SolrCore core)
- throws InterruptedException, ExecutionException {
- if (testing_beforeReplayBufferingUpdates != null) {
- testing_beforeReplayBufferingUpdates.run();
- }
- if (replicaType == Replica.Type.TLOG) {
- // roll over all updates during buffering to new tlog, make RTG available
- SolrQueryRequest req = new LocalSolrQueryRequest(core,
- new ModifiableSolrParams());
- core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
- req.close();
- return null;
- }
- Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
- if (future == null) {
- // no replay needed\
- log.info("No replay needed.");
- } else {
- log.info("Replaying buffered documents.");
- // wait for replay
- RecoveryInfo report = future.get();
- if (report.failed) {
- SolrException.log(log, "Replay failed");
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
- }
- }
-
- // the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
- core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
-
- // solrcloud_debug
- cloudDebugLog(core, "replayed");
-
- return future;
- }
-
- final private void cloudDebugLog(SolrCore core, String op) {
- if (!log.isDebugEnabled()) {
- return;
- }
- try {
- RefCounted<SolrIndexSearcher> searchHolder = core.getNewestSearcher(false);
- SolrIndexSearcher searcher = searchHolder.get();
- try {
- final int totalHits = searcher.count(new MatchAllDocsQuery());
- final String nodeName = core.getCoreContainer().getZkController().getNodeName();
- log.debug("[{}] {} [{} total hits]", nodeName, op, totalHits);
- } finally {
- searchHolder.decref();
- }
- } catch (Exception e) {
- log.debug("Error in solrcloud_debug block", e);
- }
- }
-
- final public boolean isClosed() {
- return close;
- }
-
- final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
- throws SolrServerException, IOException, InterruptedException, ExecutionException {
-
- WaitForState prepCmd = new WaitForState();
- prepCmd.setCoreName(leaderCoreName);
- prepCmd.setNodeName(zkController.getNodeName());
- prepCmd.setCoreNodeName(coreZkNodeName);
- prepCmd.setState(Replica.State.RECOVERING);
- prepCmd.setCheckLive(true);
- prepCmd.setOnlyIfLeader(true);
- final Slice.State state = slice.getState();
- if (state != Slice.State.CONSTRUCTION && state != Slice.State.RECOVERY && state != Slice.State.RECOVERY_FAILED) {
- prepCmd.setOnlyIfLeaderActive(true);
- }
-
- int conflictWaitMs = zkController.getLeaderConflictResolveWait();
- // timeout after 5 seconds more than the max timeout (conflictWait + 3 seconds) on the server side
- int readTimeout = conflictWaitMs + 8000;
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
- client.setConnectionTimeout(10000);
- client.setSoTimeout(readTimeout);
- HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
- prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
-
- log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd.toString());
-
- mrr.future.get();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
deleted file mode 100644
index 5fb0946..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-
-import org.apache.lucene.index.IndexCommit;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.SolrConfig;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.IndexFetcher;
-import org.apache.solr.handler.ReplicationHandler;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.SolrIndexWriter;
-import org.apache.solr.update.UpdateLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReplicateFromLeader {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private CoreContainer cc;
- private String coreName;
-
- private ReplicationHandler replicationProcess;
- private long lastVersion = 0;
-
- public ReplicateFromLeader(CoreContainer cc, String coreName) {
- this.cc = cc;
- this.coreName = coreName;
- }
-
- /**
- * Start a replication handler thread that will periodically pull indices from the shard leader
- * @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
- * the replication is done
- */
- public void startReplication(boolean switchTransactionLog) throws InterruptedException {
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- if (cc.isShutDown()) {
- return;
- } else {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getLoadedCoreNames());
- }
- }
- SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo();
- String pollIntervalStr = "00:00:03";
- if (uinfo.autoCommmitMaxTime != -1) {
- pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2);
- } else if (uinfo.autoSoftCommmitMaxTime != -1) {
- pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
- }
- log.info("Will start replication from leader with poll interval: {}", pollIntervalStr );
-
- NamedList<Object> slaveConfig = new NamedList<>();
- slaveConfig.add("fetchFromLeader", Boolean.TRUE);
- slaveConfig.add(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, switchTransactionLog);
- slaveConfig.add("pollInterval", pollIntervalStr);
- NamedList<Object> replicationConfig = new NamedList<>();
- replicationConfig.add("slave", slaveConfig);
-
- String lastCommitVersion = getCommitVersion(core);
- if (lastCommitVersion != null) {
- lastVersion = Long.parseLong(lastCommitVersion);
- }
-
- replicationProcess = new ReplicationHandler();
- if (switchTransactionLog) {
- replicationProcess.setPollListener((solrCore, fetchResult) -> {
- if (fetchResult == IndexFetcher.IndexFetchResult.INDEX_FETCH_SUCCESS) {
- String commitVersion = getCommitVersion(core);
- if (commitVersion == null) return;
- if (Long.parseLong(commitVersion) == lastVersion) return;
- UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
- SolrQueryRequest req = new LocalSolrQueryRequest(core,
- new ModifiableSolrParams());
- CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
- cuc.setVersion(Long.parseLong(commitVersion));
- updateLog.commitAndSwitchToNewTlog(cuc);
- lastVersion = Long.parseLong(commitVersion);
- }
- });
- }
- replicationProcess.init(replicationConfig);
- replicationProcess.inform(core);
- }
- }
-
- public static String getCommitVersion(SolrCore solrCore) {
- IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
- try {
- String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION);
- if (commitVersion == null) return null;
- else return commitVersion;
- } catch (Exception e) {
- log.warn("Cannot get commit command version from index commit point ",e);
- return null;
- }
- }
-
- private static String toPollIntervalStr(int ms) {
- int sec = ms/1000;
- int hour = sec / 3600;
- sec = sec % 3600;
- int min = sec / 60;
- sec = sec % 60;
- return hour + ":" + min + ":" + sec;
- }
-
- public void stopReplication() {
- if (replicationProcess != null) {
- replicationProcess.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java b/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
deleted file mode 100644
index 0cb6cbe..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/SizeLimitedDistributedMap.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.util.List;
-import org.apache.lucene.util.PriorityQueue;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * A size limited distributed map maintained in zk.
- * Oldest znodes (as per modification time) are evicted as newer ones come in.
- *
- * When the map hits the specified maximum size, the oldest <code>maxSize / 10</code> items
- * are evicted on the next {@link #put(String, byte[])} invocation.
- */
-public class SizeLimitedDistributedMap extends DistributedMap {
-
- private final int maxSize;
-
- /**
- * This observer will be called when this map overflows, and deletes the excess of elements
- */
- private final OnOverflowObserver onOverflowObserver;
-
- public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize) {
- this(zookeeper, dir, maxSize, null);
- }
-
- public SizeLimitedDistributedMap(SolrZkClient zookeeper, String dir, int maxSize, OnOverflowObserver onOverflowObserver) {
- super(zookeeper, dir);
- this.maxSize = maxSize;
- this.onOverflowObserver = onOverflowObserver;
- }
-
- @Override
- public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
- if (this.size() >= maxSize) {
- // Bring down the size
- List<String> children = zookeeper.getChildren(dir, null, true);
-
- int cleanupSize = maxSize / 10;
-
- final PriorityQueue<Long> priorityQueue = new PriorityQueue<Long>(cleanupSize) {
- @Override
- protected boolean lessThan(Long a, Long b) {
- return (a > b);
- }
- };
-
- for (String child : children) {
- Stat stat = zookeeper.exists(dir + "/" + child, null, true);
- priorityQueue.insertWithOverflow(stat.getMzxid());
- }
-
- long topElementMzxId = priorityQueue.top();
-
- for (String child : children) {
- Stat stat = zookeeper.exists(dir + "/" + child, null, true);
- if (stat.getMzxid() <= topElementMzxId) {
- zookeeper.delete(dir + "/" + child, -1, true);
- if (onOverflowObserver != null) onOverflowObserver.onChildDelete(child.substring(PREFIX.length()));
- }
- }
- }
-
- super.put(trackingId, data);
- }
-
- interface OnOverflowObserver {
- void onChildDelete(String child) throws KeeperException, InterruptedException;
- }
-}