You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/11/29 18:19:24 UTC
[15/16] lucene-solr:master: SOLR-12801: Make massive improvements to
the tests.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 74781d7..91b7e74 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -16,6 +16,8 @@
*/
package org.apache.solr.cloud;
+import static org.apache.solr.common.params.CommonParams.ID;
+
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -26,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.codahale.metrics.Timer;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
@@ -39,9 +40,11 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -53,7 +56,7 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.HttpShardHandler;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.zookeeper.CreateMode;
@@ -61,7 +64,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.CommonParams.ID;
+import com.codahale.metrics.Timer;
/**
* Cluster leader. Responsible for processing state updates, node assignments, creating/deleting
@@ -107,7 +110,7 @@ public class Overseer implements SolrCloseable {
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
this.zkClient = reader.getZkClient();
this.zkStats = zkStats;
- this.stateUpdateQueue = getStateUpdateQueue(zkClient, zkStats);
+ this.stateUpdateQueue = getStateUpdateQueue(zkStats);
this.workQueue = getInternalWorkQueue(zkClient, zkStats);
this.failureMap = getFailureMap(zkClient);
this.runningMap = getRunningMap(zkClient);
@@ -188,6 +191,8 @@ public class Overseer implements SolrCloseable {
// the workQueue is empty now, use stateUpdateQueue as fallback queue
fallbackQueue = stateUpdateQueue;
fallbackQueueSize = 0;
+ } catch (AlreadyClosedException e) {
+ return;
} catch (KeeperException.SessionExpiredException e) {
log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
return;
@@ -211,6 +216,8 @@ public class Overseer implements SolrCloseable {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
}
@@ -247,6 +254,8 @@ public class Overseer implements SolrCloseable {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
refreshClusterState = true; // it might have been a bad version error
@@ -308,8 +317,10 @@ public class Overseer implements SolrCloseable {
byte[] data;
try {
data = zkClient.getData(path, null, stat, true);
+ } catch (AlreadyClosedException e) {
+ return;
} catch (Exception e) {
- log.error("could not read the "+path+" data" ,e);
+ log.warn("Error communicating with ZooKeeper", e);
return;
}
try {
@@ -437,6 +448,11 @@ public class Overseer implements SolrCloseable {
} catch (InterruptedException e) {
success = false;
Thread.currentThread().interrupt();
+ } catch (AlreadyClosedException e) {
+ success = false;
+ } catch (Exception e) {
+ success = false;
+ log.warn("Unexpected exception", e);
} finally {
timerContext.stop();
if (success) {
@@ -495,7 +511,7 @@ public class Overseer implements SolrCloseable {
private final ZkStateReader reader;
- private final ShardHandler shardHandler;
+ private final HttpShardHandler shardHandler;
private final UpdateShardHandler updateShardHandler;
@@ -507,11 +523,11 @@ public class Overseer implements SolrCloseable {
private Stats stats;
private String id;
- private boolean closed;
+ private volatile boolean closed;
private CloudConfig config;
// overseer not responsible for closing reader
- public Overseer(ShardHandler shardHandler,
+ public Overseer(HttpShardHandler shardHandler,
UpdateShardHandler updateShardHandler, String adminPath,
final ZkStateReader reader, ZkController zkController, CloudConfig config)
throws KeeperException, InterruptedException {
@@ -541,7 +557,7 @@ public class Overseer implements SolrCloseable {
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, adminPath, shardHandler.getShardHandlerFactory());
+ OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getDefaultHttpClient());
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
ccThread.setDaemon(true);
@@ -554,9 +570,8 @@ public class Overseer implements SolrCloseable {
updaterThread.start();
ccThread.start();
triggerThread.start();
- if (this.id != null) {
- assert ObjectReleaseTracker.track(this);
- }
+
+ assert ObjectReleaseTracker.track(this);
}
public Stats getStats() {
@@ -595,16 +610,13 @@ public class Overseer implements SolrCloseable {
}
public synchronized void close() {
- if (closed) return;
if (this.id != null) {
log.info("Overseer (id=" + id + ") closing");
}
-
- doClose();
this.closed = true;
- if (this.id != null) {
- assert ObjectReleaseTracker.release(this);
- }
+ doClose();
+
+ assert ObjectReleaseTracker.release(this);
}
@Override
@@ -660,11 +672,10 @@ public class Overseer implements SolrCloseable {
* <p>
* This method will create the /overseer znode in ZooKeeper if it does not exist already.
*
- * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
- public static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient) {
- return getStateUpdateQueue(zkClient, new Stats());
+ ZkDistributedQueue getStateUpdateQueue() {
+ return getStateUpdateQueue(new Stats());
}
/**
@@ -672,13 +683,15 @@ public class Overseer implements SolrCloseable {
* This method should not be used directly by anyone other than the Overseer itself.
* This method will create the /overseer znode in ZooKeeper if it does not exist already.
*
- * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @param zkStats a {@link Stats} object which tracks statistics for all zookeeper operations performed by this queue
* @return a {@link ZkDistributedQueue} object
*/
- static ZkDistributedQueue getStateUpdateQueue(final SolrZkClient zkClient, Stats zkStats) {
- createOverseerNode(zkClient);
- return new ZkDistributedQueue(zkClient, "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE);
+ ZkDistributedQueue getStateUpdateQueue(Stats zkStats) {
+ return new ZkDistributedQueue(reader.getZkClient(), "/overseer/queue", zkStats, STATE_UPDATE_MAX_QUEUE, new ConnectionManager.IsClosed(){
+ public boolean isClosed() {
+ return Overseer.this.isClosed() || zkController.getCoreContainer().isShutDown();
+ }
+ });
}
/**
@@ -697,31 +710,26 @@ public class Overseer implements SolrCloseable {
* @return a {@link ZkDistributedQueue} object
*/
static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
- createOverseerNode(zkClient);
return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
}
/* Internal map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getRunningMap(final SolrZkClient zkClient) {
- createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/collection-map-running");
}
/* Size-limited map for successfully completed tasks*/
static DistributedMap getCompletedMap(final SolrZkClient zkClient) {
- createOverseerNode(zkClient);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-completed", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
}
/* Map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getFailureMap(final SolrZkClient zkClient) {
- createOverseerNode(zkClient);
return new SizeLimitedDistributedMap(zkClient, "/overseer/collection-map-failure", NUM_RESPONSES_TO_STORE, (child) -> getAsyncIdsMap(zkClient).remove(child));
}
/* Map of async IDs currently in use*/
static DistributedMap getAsyncIdsMap(final SolrZkClient zkClient) {
- createOverseerNode(zkClient);
return new DistributedMap(zkClient, "/overseer/async_ids");
}
@@ -740,7 +748,7 @@ public class Overseer implements SolrCloseable {
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
- static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
+ OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
}
@@ -758,8 +766,7 @@ public class Overseer implements SolrCloseable {
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
- static OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
- createOverseerNode(zkClient);
+ OverseerTaskQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
return new OverseerTaskQueue(zkClient, "/overseer/collection-queue-work", zkStats);
}
@@ -778,7 +785,7 @@ public class Overseer implements SolrCloseable {
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
- static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
+ OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient) {
return getConfigSetQueue(zkClient, new Stats());
}
@@ -801,15 +808,14 @@ public class Overseer implements SolrCloseable {
* @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
* @return a {@link ZkDistributedQueue} object
*/
- static OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
+ OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats) {
// For now, we use the same queue as the collection queue, but ensure
// that the actions are prefixed with a unique string.
- createOverseerNode(zkClient);
return getCollectionQueue(zkClient, zkStats);
}
- private static void createOverseerNode(final SolrZkClient zkClient) {
+ private void createOverseerNode(final SolrZkClient zkClient) {
try {
zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException e) {
@@ -823,6 +829,7 @@ public class Overseer implements SolrCloseable {
throw new RuntimeException(e);
}
}
+
public static boolean isLegacy(ZkStateReader stateReader) {
String legacyProperty = stateReader.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
return "true".equals(legacyProperty);
@@ -837,4 +844,11 @@ public class Overseer implements SolrCloseable {
return reader;
}
+ public void offerStateUpdate(byte[] data) throws KeeperException, InterruptedException {
+ if (zkController.getZkClient().isClosed()) {
+ throw new AlreadyClosedException();
+ }
+ getStateUpdateQueue().offer(data);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
index e8d85ce..78ddc82 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
@@ -16,16 +16,16 @@
*/
package org.apache.solr.cloud;
+import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
+
import java.io.IOException;
import org.apache.commons.io.IOUtils;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
-
-import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
+import org.apache.solr.handler.component.HttpShardHandler;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
/**
* An {@link OverseerTaskProcessor} that handles:
@@ -35,18 +35,18 @@ import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_A
public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor {
public OverseerCollectionConfigSetProcessor(ZkStateReader zkStateReader, String myId,
- final ShardHandler shardHandler,
+ final HttpShardHandler shardHandler,
String adminPath, Stats stats, Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer) {
this(
zkStateReader,
myId,
- shardHandler.getShardHandlerFactory(),
+ (HttpShardHandlerFactory) shardHandler.getShardHandlerFactory(),
adminPath,
stats,
overseer,
overseerNodePrioritizer,
- Overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
+ overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
Overseer.getRunningMap(zkStateReader.getZkClient()),
Overseer.getCompletedMap(zkStateReader.getZkClient()),
Overseer.getFailureMap(zkStateReader.getZkClient())
@@ -54,7 +54,7 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
}
protected OverseerCollectionConfigSetProcessor(ZkStateReader zkStateReader, String myId,
- final ShardHandlerFactory shardHandlerFactory,
+ final HttpShardHandlerFactory shardHandlerFactory,
String adminPath,
Stats stats,
Overseer overseer,
@@ -79,7 +79,7 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
private static OverseerMessageHandlerSelector getOverseerMessageHandlerSelector(
ZkStateReader zkStateReader,
String myId,
- final ShardHandlerFactory shardHandlerFactory,
+ final HttpShardHandlerFactory shardHandlerFactory,
String adminPath,
Stats stats,
Overseer overseer,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
index 34ee041..6851141 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
@@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
+import org.apache.http.client.HttpClient;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -28,6 +29,7 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
@@ -49,10 +51,16 @@ public class OverseerNodePrioritizer {
private final String adminPath;
private final ShardHandlerFactory shardHandlerFactory;
- public OverseerNodePrioritizer(ZkStateReader zkStateReader, String adminPath, ShardHandlerFactory shardHandlerFactory) {
+ private ZkDistributedQueue stateUpdateQueue;
+
+ private HttpClient httpClient;
+
+ public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory, HttpClient httpClient) {
this.zkStateReader = zkStateReader;
this.adminPath = adminPath;
this.shardHandlerFactory = shardHandlerFactory;
+ this.stateUpdateQueue = stateUpdateQueue;
+ this.httpClient = httpClient;
}
public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
@@ -88,7 +96,7 @@ public class OverseerNodePrioritizer {
invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
}
//now ask the current leader to QUIT , so that the designate can takeover
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(
+ stateUpdateQueue.offer(
Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
ID, OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()))));
@@ -96,7 +104,7 @@ public class OverseerNodePrioritizer {
private void invokeOverseerOp(String electionNode, String op) {
ModifiableSolrParams params = new ModifiableSolrParams();
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(httpClient);
params.set(CoreAdminParams.ACTION, CoreAdminAction.OVERSEEROP.toString());
params.set("op", op);
params.set("qt", adminPath);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index febeec0..3b53a54 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -36,6 +37,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -86,13 +88,13 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
// List of completed tasks. This is used to clean up workQueue in zk.
final private HashMap<String, QueueEvent> completedTasks;
- private String myId;
+ private volatile String myId;
- private ZkStateReader zkStateReader;
+ private volatile ZkStateReader zkStateReader;
private boolean isClosed;
- private Stats stats;
+ private volatile Stats stats;
// Set of tasks that have been picked up for processing but not cleaned up from zk work-queue.
// It may contain tasks that have completed execution, have been entered into the completed/failed map in zk but not
@@ -102,7 +104,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
// be executed because they are blocked or the execution queue is full
// This is an optimization to ensure that we do not read the same tasks
// again and again from ZK.
- final private Map<String, QueueEvent> blockedTasks = new LinkedHashMap<>();
+ final private Map<String, QueueEvent> blockedTasks = Collections.synchronizedMap(new LinkedHashMap<>());
final private Predicate<String> excludedTasks = new Predicate<String>() {
@Override
public boolean test(String s) {
@@ -170,6 +172,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
// We don't need to handle this. This is just a fail-safe which comes in handy in skipping already processed
// async calls.
SolrException.log(log, "", e);
+ } catch (AlreadyClosedException e) {
+ return;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -181,6 +185,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
try {
prioritizer.prioritizeOverseerNodes(myId);
+ } catch (AlreadyClosedException e) {
+ return;
} catch (Exception e) {
if (!zkStateReader.getZkClient().isClosed()) {
log.error("Unable to prioritize overseer ", e);
@@ -203,14 +209,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
continue; // not a no, not a yes, try asking again
}
- log.debug("Cleaning up work-queue. #Running tasks: {}", runningTasks.size());
+ log.debug("Cleaning up work-queue. #Running tasks: {} #Completed tasks: {}", runningTasksSize(), completedTasks.size());
cleanUpWorkQueue();
printTrackingMaps();
boolean waited = false;
- while (runningTasks.size() > MAX_PARALLEL_TASKS) {
+ while (runningTasksSize() > MAX_PARALLEL_TASKS) {
synchronized (waitLock) {
waitLock.wait(100);//wait for 100 ms or till a task is complete
}
@@ -229,7 +235,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
// to clear out at least a few items in the queue before we read more items
if (heads.size() < MAX_BLOCKED_TASKS) {
//instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
- int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasks.size());
+ int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasksSize());
List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 2000L);
log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
heads.addAll(newTasks);
@@ -251,7 +257,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
for (QueueEvent head : heads) {
if (!tooManyTasks) {
synchronized (runningTasks) {
- tooManyTasks = runningTasks.size() >= MAX_PARALLEL_TASKS;
+ tooManyTasks = runningTasksSize() >= MAX_PARALLEL_TASKS;
}
}
if (tooManyTasks) {
@@ -260,7 +266,9 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
blockedTasks.put(head.getId(), head);
continue;
}
- if (runningZKTasks.contains(head.getId())) continue;
+ synchronized (runningZKTasks) {
+ if (runningZKTasks.contains(head.getId())) continue;
+ }
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
@@ -316,6 +324,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
SolrException.log(log, "", e);
}
@@ -325,11 +335,19 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
}
+ private int runningTasksSize() {
+ synchronized (runningTasks) {
+ return runningTasks.size();
+ }
+ }
+
private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
synchronized (completedTasks) {
for (String id : completedTasks.keySet()) {
workQueue.remove(completedTasks.get(id));
- runningZKTasks.remove(id);
+ synchronized (runningTasks) {
+ runningZKTasks.remove(id);
+ }
}
completedTasks.clear();
}
@@ -502,6 +520,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.debug(messageHandler.getName() + ": Message id:" + head.getId() +
" complete, response:" + response.getResponse().toString());
success = true;
+ } catch (AlreadyClosedException e) {
+
} catch (KeeperException e) {
SolrException.log(log, "", e);
} catch (InterruptedException e) {
@@ -513,7 +533,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
lock.unlock();
if (!success) {
// Reset task from tracking data structures so that it can be retried.
- resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
+ try {
+ resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
+ } catch(AlreadyClosedException e) {
+
+ }
}
synchronized (waitLock){
waitLock.notifyAll();
@@ -587,7 +611,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.debug("CompletedTasks: {}", completedTasks.keySet().toString());
}
synchronized (runningZKTasks) {
- log.debug("RunningZKTasks: {}", runningZKTasks.toString());
+ log.info("RunningZKTasks: {}", runningZKTasks.toString());
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 67c15e8..9133266 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -63,7 +63,6 @@ import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.PeerSyncWithLeader;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.SolrPluginUtils;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
@@ -71,18 +70,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class may change in future and customisations are not supported
- * between versions in terms of API or back compat behaviour.
+ * This class may change in future and customisations are not supported between versions in terms of API or back compat
+ * behaviour.
+ *
* @lucene.experimental
*/
public class RecoveryStrategy implements Runnable, Closeable {
public static class Builder implements NamedListInitializedPlugin {
private NamedList args;
+
@Override
public void init(NamedList args) {
this.args = args;
}
+
// this should only be used from SolrCoreState
public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd,
RecoveryStrategy.RecoveryListener recoveryListener) {
@@ -90,6 +92,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
SolrPluginUtils.invokeSetters(recoveryStrategy, args);
return recoveryStrategy;
}
+
protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
RecoveryStrategy.RecoveryListener recoveryListener) {
return new RecoveryStrategy(cc, cd, recoveryListener);
@@ -98,15 +101,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
+ private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer
+ .getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
private int maxRetries = 500;
- private int startingRecoveryDelayMilliSeconds = 5000;
+ private int startingRecoveryDelayMilliSeconds = 2000;
public static interface RecoveryListener {
public void recovered();
+
public void failed();
}
-
+
private volatile boolean close = false;
private RecoveryListener recoveryListener;
@@ -121,6 +126,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
private final Replica.Type replicaType;
+ private CoreDescriptor coreDescriptor;
+
protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = cd.getName();
@@ -136,7 +143,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
return waitForUpdatesWithStaleStatePauseMilliSeconds;
}
- final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) {
+ final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(
+ int waitForUpdatesWithStaleStatePauseMilliSeconds) {
this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds;
}
@@ -185,10 +193,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
recoveryListener.failed();
}
}
-
+
/**
- * This method may change in future and customisations are not supported
- * between versions in terms of API or back compat behaviour.
+ * This method may change in future and customisations are not supported between versions in terms of API or back
+ * compat behaviour.
+ *
* @lucene.experimental
*/
protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
@@ -199,37 +208,38 @@ public class RecoveryStrategy implements Runnable, Closeable {
throws SolrServerException, IOException {
final String leaderUrl = getReplicateLeaderUrl(leaderprops);
-
+
log.info("Attempting to replicate from [{}].", leaderUrl);
-
+
// send commit
commitOnLeader(leaderUrl);
-
+
// use rep handler directly, so we can do this sync rather than async
SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
-
+
if (replicationHandler == null) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
}
-
+
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
solrParams.set(ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO, replicaType == Replica.Type.TLOG);
// always download the tlogs from the leader when running with cdcr enabled. We need to have all the tlogs
// to ensure leader failover doesn't cause missing docs on the target
- if (core.getUpdateHandler().getUpdateLog() != null && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
+ if (core.getUpdateHandler().getUpdateLog() != null
+ && core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog) {
solrParams.set(ReplicationHandler.TLOG_FILES, true);
}
-
+
if (isClosed()) return; // we check closed on return
boolean success = replicationHandler.doFetch(solrParams, false).getSuccessful();
-
+
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
}
-
+
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
@@ -245,7 +255,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
+ " from "
+ leaderUrl
+ " gen:"
- + (core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration())
+ + (core.getDeletionPolicy().getLatestCommit() != null ? "null"
+ : core.getDeletionPolicy().getLatestCommit().getGeneration())
+ " data:" + core.getDataDir()
+ " index:" + core.getIndexDir()
+ " newIndex:" + core.getNewIndexDir()
@@ -265,11 +276,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
IOException {
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
.withConnectionTimeout(30000)
+ .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient())
.build()) {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-// ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if "onlyLeaderIndexes"?
+ // ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+ // ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if
+ // "onlyLeaderIndexes"?
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
client);
@@ -304,9 +317,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
MDCLoggingContext.clear();
}
}
-
+
final public void doRecovery(SolrCore core) throws Exception {
- if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
+ // we can lose our core descriptor, so store it now
+ this.coreDescriptor = core.getCoreDescriptor();
+
+ if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
doSyncOrReplicateRecovery(core);
} else {
doReplicateOnlyRecovery(core);
@@ -316,14 +332,17 @@ public class RecoveryStrategy implements Runnable, Closeable {
final private void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
boolean successfulRecovery = false;
-// if (core.getUpdateHandler().getUpdateLog() != null) {
-// SolrException.log(log, "'replicate-only' recovery strategy should only be used if no update logs are present, but this core has one: "
-// + core.getUpdateHandler().getUpdateLog());
-// return;
-// }
- while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+ // if (core.getUpdateHandler().getUpdateLog() != null) {
+ // SolrException.log(log, "'replicate-only' recovery strategy should only be used if no update logs are present, but
+ // this core has one: "
+ // + core.getUpdateHandler().getUpdateLog());
+ // return;
+ // }
+ while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or
+ // it will close channels
+ // though
try {
- CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+ CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
@@ -333,7 +352,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
- boolean isLeader = leaderUrl.equals(ourUrl); //TODO: We can probably delete most of this code if we say this strategy can only be used for pull replicas
+ boolean isLeader = leaderUrl.equals(ourUrl); // TODO: We can probably delete most of this code if we say this
+ // strategy can only be used for pull replicas
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
@@ -342,14 +362,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader!");
log.info("Finished recovery process.");
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
return;
}
-
log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
ourUrl);
- zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+ zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
if (isClosed()) {
log.info("Recovery for core {} has been closed", core.getName());
@@ -381,7 +400,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.startReplicationFromLeader(coreName, false);
log.info("Registering as Active after recovery.");
try {
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
} catch (Exception e) {
log.error("Could not publish as ACTIVE after succesful recovery", e);
successfulRecovery = false;
@@ -411,7 +430,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (retries >= maxRetries) {
SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
try {
- recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName, this.coreDescriptor);
} catch (Exception e) {
SolrException.log(log, "Could not publish that recovery failed", e);
}
@@ -457,7 +476,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover.");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
- core.getCoreDescriptor());
+ this.coreDescriptor);
return;
}
@@ -478,20 +497,22 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
int oldIdx = 0; // index of the start of the old list in the current list
long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
-
+
for (; oldIdx < recentVersions.size(); oldIdx++) {
if (recentVersions.get(oldIdx) == firstStartingVersion) break;
}
-
+
if (oldIdx > 0) {
log.info("Found new versions added after startup: num=[{}]", oldIdx);
- log.info("currentVersions size={} range=[{} to {}]", recentVersions.size(), recentVersions.get(0), recentVersions.get(recentVersions.size()-1));
+ log.info("currentVersions size={} range=[{} to {}]", recentVersions.size(), recentVersions.get(0),
+ recentVersions.get(recentVersions.size() - 1));
}
if (startingVersions.isEmpty()) {
log.info("startupVersions is empty");
} else {
- log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0), startingVersions.get(startingVersions.size()-1));
+ log.info("startupVersions size={} range=[{} to {}]", startingVersions.size(), startingVersions.get(0),
+ startingVersions.get(startingVersions.size() - 1));
}
} catch (Exception e) {
SolrException.log(log, "Error getting recent versions.", e);
@@ -501,7 +522,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
- // when we went down. We may have received updates since then.
+ // when we went down. We may have received updates since then.
recentVersions = startingVersions;
try {
if (ulog.existOldBufferLog()) {
@@ -523,10 +544,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
Future<RecoveryInfo> replayFuture = null;
- while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+ while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or
+ // it will close channels
+ // though
try {
- CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+ CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
+ final Replica leader = pingLeader(ourUrl, this.coreDescriptor, true);
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
@@ -540,7 +563,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader!");
log.info("Finished recovery process.");
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
return;
}
@@ -548,37 +571,37 @@ public class RecoveryStrategy implements Runnable, Closeable {
// recalling buffer updates will drop the old buffer tlog
ulog.bufferUpdates();
- log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
+ log.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(),
+ leader.getCoreUrl(),
ourUrl);
- zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
-
-
+ zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
+
final Slice slice = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName())
.getSlice(cloudDesc.getShardId());
-
+
try {
prevSendPreRecoveryHttpUriRequest.abort();
} catch (NullPointerException e) {
// okay
}
-
+
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
-
+
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
break;
}
-
+
// we wait a bit so that any updates on the leader
- // that started before they saw recovering state
+ // that started before they saw recovering state
// are sure to have finished (see SOLR-7141 for
// discussion around current value)
- //TODO since SOLR-11216, we probably won't need this
+ // TODO since SOLR-11216, we probably won't need this
try {
Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
} catch (InterruptedException e) {
@@ -588,7 +611,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
- log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
+ log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(),
+ recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core,
@@ -604,7 +628,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// solrcloud_debug
cloudDebugLog(core, "synced");
-
+
log.info("Replaying updates buffered during PeerSync.");
replayFuture = replay(core);
@@ -620,7 +644,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("RecoveryStrategy has been closed");
break;
}
-
+
log.info("Starting Replication Recovery.");
try {
@@ -658,12 +682,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (replicaType == Replica.Type.TLOG) {
zkController.startReplicationFromLeader(coreName, true);
}
- zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
} catch (Exception e) {
log.error("Could not publish as ACTIVE after succesful recovery", e);
successfulRecovery = false;
}
-
+
if (successfulRecovery) {
close = true;
recoveryListener.recovered();
@@ -681,14 +705,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("RecoveryStrategy has been closed");
break;
}
-
+
log.error("Recovery failed - trying again... (" + retries + ")");
-
+
retries++;
if (retries >= maxRetries) {
SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
try {
- recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+ recoveryFailed(core, zkController, baseUrl, coreZkNodeName, this.coreDescriptor);
} catch (Exception e) {
SolrException.log(log, "Could not publish that recovery failed", e);
}
@@ -699,12 +723,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
try {
- // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
- // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
- // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
- // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
- double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
- log.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
+ // Wait an exponential interval between retries, start at 2 seconds and work up to a minute.
+ // Since we sleep at 2 seconds sub-intervals in
+ // order to check if we were closed, 30 is chosen as the maximum loopCount (2s * 30 = 1m).
+ double loopCount = Math.min(Math.pow(2, retries - 1), 30);
+ log.info("Wait [{}] seconds before trying to recover again (attempt={})",
+ loopCount * startingRecoveryDelayMilliSeconds, retries);
for (int i = 0; i < loopCount; i++) {
if (isClosed()) {
log.info("RecoveryStrategy has been closed");
@@ -731,13 +755,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
- private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+ private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
+ throws Exception {
int numTried = 0;
while (true) {
CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
if (!isClosed() && mayPutReplicaAsDown && numTried == 1 &&
- docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+ docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName())
+ .getState() == Replica.State.ACTIVE) {
// this operation may take a long time, by putting replica into DOWN state, client won't query this replica
zkController.publish(coreDesc, Replica.State.DOWN);
}
@@ -763,6 +789,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
.withSocketTimeout(1000)
.withConnectionTimeout(1000)
+ .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient())
.build()) {
SolrPingResponse resp = httpSolrClient.ping();
return leaderReplica;
@@ -811,13 +838,13 @@ public class RecoveryStrategy implements Runnable, Closeable {
// the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
core.getUpdateHandler().getUpdateLog().openRealtimeSearcher();
-
+
// solrcloud_debug
cloudDebugLog(core, "replayed");
-
+
return future;
}
-
+
final private void cloudDebugLog(SolrCore core, String op) {
if (!log.isDebugEnabled()) {
return;
@@ -838,9 +865,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
final public boolean isClosed() {
- return close;
+ return close || cc.isShutDown();
}
-
+
final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
throws SolrServerException, IOException, InterruptedException, ExecutionException {
@@ -858,8 +885,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
int conflictWaitMs = zkController.getLeaderConflictResolveWait();
// timeout after 5 seconds more than the max timeout (conflictWait + 3 seconds) on the server side
- int readTimeout = conflictWaitMs + 8000;
- try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
+ int readTimeout = conflictWaitMs + Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "8000"));
+ try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl)
+ .withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient()).build()) {
client.setConnectionTimeout(10000);
client.setSoTimeout(readTimeout);
HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index f881b5d..957b321 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -39,11 +39,11 @@ import org.slf4j.LoggerFactory;
public class ReplicateFromLeader {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private CoreContainer cc;
- private String coreName;
+ private final CoreContainer cc;
+ private final String coreName;
- private ReplicationHandler replicationProcess;
- private long lastVersion = 0;
+ private volatile ReplicationHandler replicationProcess;
+ private volatile long lastVersion = 0;
public ReplicateFromLeader(CoreContainer cc, String coreName) {
this.cc = cc;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index 3d9a964..2391414 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
@@ -70,7 +71,7 @@ public class SyncStrategy {
public SyncStrategy(CoreContainer cc) {
UpdateShardHandler updateShardHandler = cc.getUpdateShardHandler();
client = updateShardHandler.getDefaultHttpClient();
- shardHandler = cc.getShardHandlerFactory().getShardHandler();
+ shardHandler = ((HttpShardHandlerFactory)cc.getShardHandlerFactory()).getShardHandler(cc.getUpdateShardHandler().getDefaultHttpClient());
updateExecutor = updateShardHandler.getUpdateExecutor();
}
@@ -113,17 +114,18 @@ public class SyncStrategy {
private PeerSync.PeerSyncResult syncReplicas(ZkController zkController, SolrCore core,
ZkNodeProps leaderProps, boolean peerSyncOnlyWithActive) {
+ if (isClosed) {
+ log.info("We have been closed, won't sync with replicas");
+ return PeerSync.PeerSyncResult.failure();
+ }
boolean success = false;
PeerSync.PeerSyncResult result = null;
+ assert core != null;
+ assert core.getCoreDescriptor() != null;
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
String shardId = cloudDesc.getShardId();
- if (isClosed) {
- log.info("We have been closed, won't sync with replicas");
- return PeerSync.PeerSyncResult.failure();
- }
-
// first sync ourselves - we are the potential leader after all
try {
result = syncWithReplicas(zkController, core, leaderProps, collection,
@@ -160,6 +162,11 @@ public class SyncStrategy {
List<ZkCoreNodeProps> nodes = zkController.getZkStateReader()
.getReplicaProps(collection, shardId,core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+ if (isClosed) {
+ log.info("We have been closed, won't sync with replicas");
+ return PeerSync.PeerSyncResult.failure();
+ }
+
if (nodes == null) {
// I have no replicas
return PeerSync.PeerSyncResult.success();
@@ -184,6 +191,11 @@ public class SyncStrategy {
String shardId, ZkNodeProps leaderProps, CoreDescriptor cd,
int nUpdates) {
+ if (isClosed) {
+ log.info("We have been closed, won't sync replicas to me.");
+ return;
+ }
+
// sync everyone else
// TODO: we should do this in parallel at least
List<ZkCoreNodeProps> nodes = zkController
@@ -289,6 +301,11 @@ public class SyncStrategy {
}
@Override
public void run() {
+
+ if (isClosed) {
+ log.info("We have been closed, won't request recovery");
+ return;
+ }
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5caad81..32a030c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cloud;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -46,6 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -62,11 +64,13 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
@@ -90,6 +94,7 @@ import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.StrUtils;
@@ -102,6 +107,7 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrCoreInitializationException;
import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
+import org.apache.solr.handler.component.HttpShardHandler;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.SolrDispatchFilter;
@@ -137,7 +143,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
* <p>
* TODO: exceptions during close on attempts to update cloud state
*/
-public class ZkController {
+public class ZkController implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final int WAIT_DOWN_STATES_TIMEOUT_SECONDS = 60;
@@ -433,11 +439,14 @@ public class ZkController {
closeOutstandingElections(registerOnReconnect);
markAllAsNotLeader(registerOnReconnect);
}
- }, zkACLProvider);
+ }, zkACLProvider, new ConnectionManager.IsClosed() {
+
+ @Override
+ public boolean isClosed() {
+ return cc.isShutDown();
+ }});
+
- this.overseerJobQueue = Overseer.getStateUpdateQueue(zkClient);
- this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
- this.overseerConfigSetQueue = Overseer.getConfigSetQueue(zkClient);
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
@@ -448,6 +457,10 @@ public class ZkController {
});
init(registerOnReconnect);
+
+ this.overseerJobQueue = overseer.getStateUpdateQueue();
+ this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
+ this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
assert ObjectReleaseTracker.track(this);
}
@@ -554,42 +567,62 @@ public class ZkController {
*/
public void close() {
this.isClosed = true;
+
+ ForkJoinPool customThreadPool = new ForkJoinPool(10);
+
+ customThreadPool.submit(() -> Collections.singleton(overseerElector.getContext()).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ customThreadPool.submit(() -> Collections.singleton(overseer).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
synchronized (collectionToTerms) {
- collectionToTerms.values().forEach(ZkCollectionTerms::close);
+ customThreadPool.submit(() -> collectionToTerms.values().parallelStream().forEach(c -> {
+ c.close();
+ }));
}
try {
- for (ElectionContext context : electionContexts.values()) {
+
+ customThreadPool.submit(() -> replicateFromLeaders.values().parallelStream().forEach(c -> {
+ c.stopReplication();
+ }));
+
+ customThreadPool.submit(() -> electionContexts.values().parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ } finally {
+
+ customThreadPool.submit(() -> Collections.singleton(cloudSolrClient).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+ customThreadPool.submit(() -> Collections.singleton(cloudManager).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+
+ try {
try {
- context.close();
+ zkStateReader.close();
} catch (Exception e) {
- log.error("Error closing overseer", e);
+ log.error("Error closing zkStateReader", e);
}
- }
- } finally {
- try {
- IOUtils.closeQuietly(overseerElector.getContext());
- IOUtils.closeQuietly(overseer);
} finally {
- if (cloudSolrClient != null) {
- IOUtils.closeQuietly(cloudSolrClient);
- }
- if (cloudManager != null) {
- IOUtils.closeQuietly(cloudManager);
- }
try {
- try {
- zkStateReader.close();
- } catch (Exception e) {
- log.error("Error closing zkStateReader", e);
- }
+ zkClient.close();
+ } catch (Exception e) {
+ log.error("Error closing zkClient", e);
} finally {
- try {
- zkClient.close();
- } catch (Exception e) {
- log.error("Error closing zkClient", e);
- }
+
+ // just in case the OverseerElectionContext managed to start another Overseer
+ IOUtils.closeQuietly(overseer);
+
+ ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
}
+
}
+
}
assert ObjectReleaseTracker.release(this);
}
@@ -669,9 +702,11 @@ public class ZkController {
if (cloudManager != null) {
return cloudManager;
}
- cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkServerAddress), Optional.empty())
- .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient()).build();
+ cloudSolrClient = new CloudSolrClient.Builder(Collections.singletonList(zkServerAddress), Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000)
+ .withHttpClient(cc.getUpdateShardHandler().getDefaultHttpClient())
+ .withConnectionTimeout(15000).withSocketTimeout(30000).build();
cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cloudSolrClient);
+ cloudManager.getClusterStateProvider().connect();
}
return cloudManager;
}
@@ -764,7 +799,8 @@ public class ZkController {
* @throws KeeperException if there is a Zookeeper error
* @throws InterruptedException on interrupt
*/
- public static void createClusterZkNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
+ public static void createClusterZkNodes(SolrZkClient zkClient)
+ throws KeeperException, InterruptedException, IOException {
ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
@@ -777,7 +813,7 @@ public class ZkController {
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
- bootstrapDefaultConfigSet(zkClient);
+ bootstrapDefaultConfigSet(zkClient);
}
private static void bootstrapDefaultConfigSet(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
@@ -839,7 +875,7 @@ public class ZkController {
// start the overseer first as following code may need it's processing
if (!zkRunOnly) {
overseerElector = new LeaderElector(zkClient);
- this.overseer = new Overseer(cc.getShardHandlerFactory().getShardHandler(), cc.getUpdateShardHandler(),
+ this.overseer = new Overseer((HttpShardHandler) cc.getShardHandlerFactory().getShardHandler(), cc.getUpdateShardHandler(),
CommonParams.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
ElectionContext context = new OverseerElectionContext(zkClient,
overseer, getNodeName());
@@ -911,10 +947,10 @@ public class ZkController {
LiveNodesListener listener = (oldNodes, newNodes) -> {
oldNodes.removeAll(newNodes);
if (oldNodes.isEmpty()) { // only added nodes
- return;
+ return false;
}
if (isClosed) {
- return;
+ return true;
}
// if this node is in the top three then attempt to create nodeLost message
int i = 0;
@@ -923,7 +959,7 @@ public class ZkController {
break;
}
if (i > 2) {
- return; // this node is not in the top three
+ return false; // this node is not in the top three
}
i++;
}
@@ -948,11 +984,17 @@ public class ZkController {
}
}
}
+ return false;
};
zkStateReader.registerLiveNodesListener(listener);
}
public void publishAndWaitForDownStates() throws KeeperException,
+ InterruptedException {
+ publishAndWaitForDownStates(WAIT_DOWN_STATES_TIMEOUT_SECONDS);
+ }
+
+ public void publishAndWaitForDownStates(int timeoutSeconds) throws KeeperException,
InterruptedException {
publishNodeAsDown(getNodeName());
@@ -983,7 +1025,7 @@ public class ZkController {
});
}
- boolean allPublishedDown = latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ boolean allPublishedDown = latch.await(timeoutSeconds, TimeUnit.SECONDS);
if (!allPublishedDown) {
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
}
@@ -1051,10 +1093,13 @@ public class ZkController {
log.info("Remove node as live in ZooKeeper:" + nodePath);
List<Op> ops = new ArrayList<>(2);
ops.add(Op.delete(nodePath, -1));
- if (zkClient.exists(nodeAddedPath, true)) {
- ops.add(Op.delete(nodeAddedPath, -1));
+ ops.add(Op.delete(nodeAddedPath, -1));
+
+ try {
+ zkClient.multi(ops, true);
+ } catch (NoNodeException e) {
+
}
- zkClient.multi(ops, true);
}
public String getNodeName() {
@@ -1158,6 +1203,10 @@ public class ZkController {
// TODO: should this actually be done earlier, before (or as part of)
// leader election perhaps?
+ if (core == null) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "SolrCore is no longer available to register");
+ }
+
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
if (isTlogReplicaAndNotLeader) {
@@ -1270,6 +1319,7 @@ public class ZkController {
final long msInSec = 1000L;
int maxTries = (int) Math.floor(leaderConflictResolveWait / msInSec);
while (!leaderUrl.equals(clusterStateLeaderUrl)) {
+ if (cc.isShutDown()) throw new AlreadyClosedException();
if (tries > maxTries) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"There is conflicting information about the leader of shard: "
@@ -1290,6 +1340,8 @@ public class ZkController {
.getCoreUrl();
}
+ } catch (AlreadyClosedException e) {
+ throw e;
} catch (Exception e) {
log.error("Error getting leader from zk", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -1336,7 +1388,7 @@ public class ZkController {
Thread.sleep(1000);
}
if (cc.isShutDown()) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "CoreContainer is closed");
+ throw new AlreadyClosedException();
}
}
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Could not get leader props", exp);
@@ -2392,6 +2444,9 @@ public class ZkController {
}
private boolean fireEventListeners(String zkDir) {
+ if (isClosed || cc.isShutDown()) {
+ return false;
+ }
synchronized (confDirectoryListeners) {
// if this is not among directories to be watched then don't set the watcher anymore
if (!confDirectoryListeners.containsKey(zkDir)) {
@@ -2527,15 +2582,17 @@ public class ZkController {
* @param nodeName to operate on
*/
public void publishNodeAsDown(String nodeName) {
- log.debug("Publish node={} as DOWN", nodeName);
+ log.info("Publish node={} as DOWN", nodeName);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
ZkStateReader.NODE_NAME_PROP, nodeName);
try {
- Overseer.getStateUpdateQueue(getZkClient()).offer(Utils.toJSON(m));
+ overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
+ } catch (AlreadyClosedException e) {
+ log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
} catch (InterruptedException e) {
- Thread.interrupted();
+ Thread.currentThread().interrupt();
log.debug("Publish node as down was interrupted.");
- } catch (Exception e) {
+ } catch (KeeperException e) {
log.warn("Could not publish node as down: " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 7acdfef..d3ce990 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
import org.apache.solr.common.util.Pair;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -113,11 +114,15 @@ public class ZkDistributedQueue implements DistributedQueue {
public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats) {
this(zookeeper, dir, stats, 0);
}
-
+
public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats, int maxQueueSize) {
+ this(zookeeper, dir, stats, maxQueueSize, null);
+ }
+
+ public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats, int maxQueueSize, IsClosed higherLevelIsClosed) {
this.dir = dir;
- ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
+ ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout(), higherLevelIsClosed);
try {
cmdExecutor.ensureExists(dir, zookeeper);
} catch (KeeperException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index bcbb347..01fe62b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -313,29 +313,24 @@ public class ZkShardTerms implements AutoCloseable{
* Create correspond ZK term node
*/
private void ensureTermNodeExist() {
- String path = "/collections/"+collection+ "/terms";
+ String path = "/collections/" + collection + "/terms";
try {
- if (!zkClient.exists(path, true)) {
- try {
- zkClient.makePath(path, true);
- } catch (KeeperException.NodeExistsException e) {
- // it's okay if another beats us creating the node
- }
- }
- path += "/"+shard;
- if (!zkClient.exists(path, true)) {
- try {
- Map<String, Long> initialTerms = new HashMap<>();
- zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
- } catch (KeeperException.NodeExistsException e) {
- // it's okay if another beats us creating the node
- }
+ path += "/" + shard;
+
+ try {
+ Map<String,Long> initialTerms = new HashMap<>();
+ zkClient.makePath(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException e) {
+ // it's okay if another beats us creating the node
}
- } catch (InterruptedException e) {
+
+ } catch (InterruptedException e) {
Thread.interrupted();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection: " + collection, e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error creating shard term node in Zookeeper for collection: " + collection, e);
} catch (KeeperException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection: " + collection, e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Error creating shard term node in Zookeeper for collection: " + collection, e);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 8b72cdf..a0abaf0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -245,7 +245,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
}
try {
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
}
@@ -328,6 +328,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
}
+ log.info("Returning CreateReplica command.");
return new CreateReplica(collection, shard, node, replicaType, coreName, coreNodeName);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index fd09a3f..318cdf7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -115,7 +115,7 @@ public class Assign {
} catch (IOException | KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:"+collection, e);
} catch (InterruptedException e) {
- Thread.interrupted();
+ Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:" + collection, e);
}
}
@@ -182,21 +182,34 @@ public class Assign {
return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
}
- private static int defaultCounterValue(DocCollection collection, boolean newCollection) {
+ private static int defaultCounterValue(DocCollection collection, boolean newCollection, String shard) {
if (newCollection) return 0;
- int defaultValue = collection.getReplicas().size();
+
+ int defaultValue;
+ if (collection.getSlice(shard) != null && collection.getSlice(shard).getReplicas().isEmpty()) {
+ return 0;
+ } else {
+ defaultValue = collection.getReplicas().size() * 2;
+ }
+
if (collection.getReplicationFactor() != null) {
// numReplicas and replicationFactor * numSlices can be not equals,
// in case of many addReplicas or deleteReplicas are executed
defaultValue = Math.max(defaultValue,
collection.getReplicationFactor() * collection.getSlices().size());
}
- return defaultValue * 20;
+ return defaultValue;
+ }
+
+ private static int defaultCounterValue(DocCollection collection, boolean newCollection) {
+ if (newCollection) return 0;
+ int defaultValue = collection.getReplicas().size();
+ return defaultValue;
}
public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
Slice slice = collection.getSlice(shard);
- int defaultValue = defaultCounterValue(collection, newCollection);
+ int defaultValue = defaultCounterValue(collection, newCollection, shard);
int replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
String coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
while (existCoreName(coreName, slice)) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index b8aba76..fd9faad 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -160,7 +160,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
String backupName = request.getStr(NAME);
String asyncId = request.getStr(ASYNC);
String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
Map<String, String> requestMap = new HashMap<>();
String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 533aee8..0f5e41a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -155,8 +155,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
createCollectionZkNode(stateManager, collectionName, collectionParams);
-
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
// wait for a while until we see the collection
TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
@@ -195,7 +195,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
collectionName, shardNames, message));
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
for (ReplicaPosition replicaPosition : replicaPositions) {
String nodeName = replicaPosition.node;
@@ -235,7 +235,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader.BASE_URL_PROP, baseUrl,
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
}
// Need to create new params for each request
@@ -308,7 +308,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
ZkStateReader.COLLECTION_PROP, withCollection,
CollectionAdminParams.COLOCATED_WITH, collectionName);
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
try {
zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
} catch (TimeoutException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index e7f35f1..229b799 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -21,7 +21,6 @@ import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
-import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -71,7 +70,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
ZkStateReader zkStateReader = ocmh.zkStateReader;
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
// wait for a while until we see the shard
ocmh.waitForNewShard(collectionName, sliceName);
String async = message.getStr(ASYNC);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index 32715d6..8a091ef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -84,7 +84,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
Map<String, String> requestMap = new HashMap<>();
NamedList shardRequestResults = new NamedList();
Map<String, Slice> shardByCoreName = new HashMap<>();
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
for (Replica replica : slice.getReplicas()) {