You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/03 18:47:03 UTC
[lucene-solr] 03/08: @1319 Operation to op and a bit of cleanup.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 50af49f16b94f306474ff37182178c46cd94f49c
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Feb 3 01:28:41 2021 -0600
@1319 Operation to op and a bit of cleanup.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 2 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 3 +-
.../solr/cloud/ShardLeaderElectionContext.java | 8 +-
.../java/org/apache/solr/cloud/StatePublisher.java | 97 ++++++++++++----------
.../java/org/apache/solr/cloud/SyncStrategy.java | 5 --
.../java/org/apache/solr/cloud/ZkController.java | 2 +-
.../solr/cloud/api/collections/AddReplicaCmd.java | 2 +-
.../solr/cloud/api/collections/BackupCmd.java | 3 +-
.../cloud/api/collections/CreateCollectionCmd.java | 2 +-
.../solr/cloud/api/collections/CreateShardCmd.java | 3 +-
.../cloud/api/collections/CreateSnapshotCmd.java | 3 +-
.../cloud/api/collections/DeleteCollectionCmd.java | 3 +-
.../solr/cloud/api/collections/DeleteNodeCmd.java | 3 +-
.../cloud/api/collections/DeleteReplicaCmd.java | 3 +-
.../solr/cloud/api/collections/DeleteShardCmd.java | 2 +-
.../cloud/api/collections/DeleteSnapshotCmd.java | 3 +-
.../solr/cloud/api/collections/MigrateCmd.java | 10 +--
.../OverseerCollectionMessageHandler.java | 2 +-
.../solr/cloud/api/collections/ReplaceNodeCmd.java | 5 +-
.../solr/cloud/api/collections/RestoreCmd.java | 4 +-
.../solr/cloud/api/collections/SplitShardCmd.java | 6 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 11 +--
.../java/org/apache/solr/core/CoreContainer.java | 39 ++++-----
.../src/java/org/apache/solr/core/SolrCore.java | 89 ++++++++------------
.../src/java/org/apache/solr/core/SolrCores.java | 29 ++++---
.../apache/solr/servlet/SolrDispatchFilter.java | 2 +-
.../solr/handler/admin/TestCollectionAPIs.java | 50 +++++------
.../apache/solr/handler/admin/TestConfigsApi.java | 4 +-
.../java/org/apache/solr/common/cloud/Replica.java | 4 +
29 files changed, 195 insertions(+), 204 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 8bedc7f..0be94c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -134,7 +134,7 @@ import java.util.function.BiConsumer;
* </ol>
*/
public class Overseer implements SolrCloseable {
- public static final String QUEUE_OPERATION = "operation";
+ public static final String QUEUE_OPERATION = "op";
public static final String OVERSEER_COLLECTION_QUEUE_WORK = "/overseer/collection-queue-work";
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index d48d4d7..251e931 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -178,6 +178,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
final public void close() {
close = true;
+ if (log.isDebugEnabled()) log.debug("Stopping recovery for core=[{}]", coreName);
+
try {
if (prevSendPreRecoveryHttpUriRequest != null) {
prevSendPreRecoveryHttpUriRequest.cancel();
@@ -196,7 +198,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
latch.countDown();
}
- log.warn("Stopping recovery for core=[{}]", coreName);
//ObjectReleaseTracker.release(this);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index dab1314..e87e1a1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -95,7 +95,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
String coreName = leaderProps.getName();
- log.info("Run leader process for shard [{}] election, first step is to try and sync with the shard core={}", context.leaderProps.getSlice(), coreName);
+ log.info("Run leader process for shard [{}] election, first step is to try and sync with the shard", context.leaderProps.getSlice());
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
log.info("No SolrCore found, cannot become leader {}", coreName);
@@ -169,8 +169,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
success = result.isSuccess();
if (!success) {
-
- log.warn("Our sync attempt failed");
boolean hasRecentUpdates = false;
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
@@ -195,8 +193,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
success = true;
}
}
- } else {
- log.info("Our sync attempt succeeded");
}
// solrcloud_debug
// if (log.isDebugEnabled()) {
@@ -250,7 +246,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
ZkNodeProps zkNodes = ZkNodeProps
- .fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.CORE_NAME_PROP, leaderProps.getName(),
+ .fromKeyVals(StatePublisher.OPERATION, OverseerAction.STATE.toLower(), ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.CORE_NAME_PROP, leaderProps.getName(),
ZkStateReader.STATE_PROP, "leader");
log.info("I am the new leader, publishing as active: " + leaderProps.getCoreUrl() + " " + shardId);
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 0d5ed61..76b9569 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -34,6 +34,7 @@ import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashSet;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -44,8 +45,9 @@ import java.util.concurrent.TimeUnit;
public class StatePublisher implements Closeable {
private static final Logger log = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String OPERATION = "op";
- private final Map<String,String> stateCache = new ConcurrentHashMap<>(32, 0.75f, 4);
+ private final Map<String,String> stateCache = new ConcurrentHashMap<>(32, 0.75f, 6);
private final ZkStateReader zkStateReader;
private final CoreContainer cc;
@@ -54,7 +56,7 @@ public class StatePublisher implements Closeable {
public static final NoOpMessage TERMINATE_OP = new NoOpMessage();
- private final ArrayBlockingQueue<ZkNodeProps> workQueue = new ArrayBlockingQueue(32, true);
+ private final ArrayBlockingQueue<ZkNodeProps> workQueue = new ArrayBlockingQueue(64, true);
private final ZkDistributedQueue overseerJobQueue;
private volatile Worker worker;
private volatile Future<?> workerFuture;
@@ -83,7 +85,7 @@ public class StatePublisher implements Closeable {
throttle.markAttemptingAction();
ZkNodeProps message = null;
ZkNodeProps bulkMessage = new ZkNodeProps();
- bulkMessage.getProperties().put("operation", "state");
+ bulkMessage.getProperties().put(OPERATION, "state");
try {
try {
message = workQueue.poll(15, TimeUnit.SECONDS);
@@ -128,12 +130,12 @@ public class StatePublisher implements Closeable {
}
private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
- if (OverseerAction.get(zkNodeProps.getStr("operation")) == OverseerAction.DOWNNODE) {
+ if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.DOWNNODE) {
String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
bulkMessage.getProperties().put(OverseerAction.DOWNNODE.toLower(), nodeName);
clearStatesForNode(bulkMessage, nodeName);
- } else if (OverseerAction.get(zkNodeProps.getStr("operation")) == OverseerAction.RECOVERYNODE) {
+ } else if (OverseerAction.get(zkNodeProps.getStr(OPERATION)) == OverseerAction.RECOVERYNODE) {
String nodeName = zkNodeProps.getStr(ZkStateReader.NODE_NAME_PROP);
bulkMessage.getProperties().put(OverseerAction.RECOVERYNODE.toLower(), nodeName);
@@ -148,7 +150,7 @@ public class StatePublisher implements Closeable {
return;
}
- bulkMessage.getProperties().put(core, collection + "," + state);
+ bulkMessage.getProperties().put(core, collection + "," + Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT))));
}
}
@@ -177,7 +179,7 @@ public class StatePublisher implements Closeable {
private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
byte[] updates = Utils.toJSON(message);
- log.info("Send state updates to Overseer {}", message);
+ if (log.isDebugEnabled()) log.debug("Send state updates to Overseer {}", message);
overseerJobQueue.offer(updates);
}
}
@@ -189,53 +191,58 @@ public class StatePublisher implements Closeable {
}
public void submitState(ZkNodeProps stateMessage) {
- // Don't allow publish of state we last published if not DOWNNODE
- if (stateMessage != TERMINATE_OP) {
- String operation = stateMessage.getStr("operation");
- if (operation.equals("state")) {
- String core = stateMessage.getStr(ZkStateReader.CORE_NAME_PROP);
- String state = stateMessage.getStr(ZkStateReader.STATE_PROP);
- String collection = stateMessage.getStr(ZkStateReader.COLLECTION_PROP);
-
- DocCollection coll = zkStateReader.getClusterState().getCollectionOrNull(collection);
- if (coll != null) {
- Replica replica = coll.getReplica(core);
- String lastState = stateCache.get(core);
- // nocommit
- if (collection != null && replica != null && !state.equals(Replica.State.ACTIVE) && state.equals(lastState) && replica.getState().toString().equals(state)) {
- log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
+ // Don't allow publish of state we last published if not DOWNNODE?
+ try {
+ if (stateMessage != TERMINATE_OP) {
+ String operation = stateMessage.getStr(OPERATION);
+ if (operation.equals("state")) {
+ String core = stateMessage.getStr(ZkStateReader.CORE_NAME_PROP);
+ String state = stateMessage.getStr(ZkStateReader.STATE_PROP);
+ String collection = stateMessage.getStr(ZkStateReader.COLLECTION_PROP);
+
+ DocCollection coll = zkStateReader.getClusterState().getCollectionOrNull(collection);
+ if (coll != null) {
+ Replica replica = coll.getReplica(core);
+ String lastState = stateCache.get(core);
// nocommit
- return;
+ if (collection != null && replica != null && !state.equals(Replica.State.ACTIVE) && state.equals(lastState) && replica.getState().toString().equals(state)) {
+ log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
+ // nocommit
+ return;
+ }
+ }
+ if (core == null || state == null) {
+ log.error("Nulls in published state");
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Nulls in published state " + stateMessage);
}
- }
- if (core == null || state == null) {
- log.error("Nulls in published state");
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Nulls in published state " + stateMessage);
- }
- stateCache.put(core, state);
- } else if (operation.equalsIgnoreCase(OverseerAction.DOWNNODE.toLower())) {
- // set all statecache entries for replica to a state
+ stateCache.put(core, state);
+ } else if (operation.equalsIgnoreCase(OverseerAction.DOWNNODE.toLower())) {
+ // set all statecache entries for replica to a state
- Collection<String> coreNames = cc.getAllCoreNames();
- for (String core : coreNames) {
- stateCache.put(core, Replica.State.getShortState(Replica.State.DOWN));
- }
+ Collection<String> coreNames = cc.getAllCoreNames();
+ for (String core : coreNames) {
+ stateCache.put(core, Replica.State.getShortState(Replica.State.DOWN));
+ }
- } else if (operation.equalsIgnoreCase(OverseerAction.RECOVERYNODE.toLower())) {
- // set all statecache entries for replica to a state
+ } else if (operation.equalsIgnoreCase(OverseerAction.RECOVERYNODE.toLower())) {
+ // set all statecache entries for replica to a state
- Collection<String> coreNames = cc.getAllCoreNames();
- for (String core : coreNames) {
- stateCache.put(core, Replica.State.getShortState(Replica.State.RECOVERING));
- }
+ Collection<String> coreNames = cc.getAllCoreNames();
+ for (String core : coreNames) {
+ stateCache.put(core, Replica.State.getShortState(Replica.State.RECOVERING));
+ }
- } else {
- throw new IllegalArgumentException(stateMessage.toString());
+ } else {
+ throw new IllegalArgumentException(stateMessage.toString());
+ }
}
- }
- workQueue.offer(stateMessage);
+ workQueue.offer(stateMessage);
+ } catch (Exception e) {
+ log.error("Exception trying to publish state message={}", stateMessage, e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
}
public void clearStatCache(String core) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
index c6d331c..44cdd3b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
@@ -71,11 +71,6 @@ public class SyncStrategy implements Closeable {
return PeerSync.PeerSyncResult.success();
}
- if (log.isInfoEnabled()) {
- log.info("Sync replicas to {}", leaderProps.getCoreUrl());
- }
-
-
this.zkController = zkController;
return syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 0aa92a7..bc4ad9e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -2448,7 +2448,7 @@ public class ZkController implements Closeable, Runnable {
return;
}
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, state.toLower(),
+ ZkNodeProps m = new ZkNodeProps(StatePublisher.OPERATION, state.toLower(),
ZkStateReader.NODE_NAME_PROP, nodeName);
try {
statePublisher.submitState(m);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index d5720fe..d38b669 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -101,7 +101,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
final String asyncId = message.getStr(ASYNC);
shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
createdShardHandler = true;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index 2ccf0a9..482aa57 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -31,6 +31,7 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.lucene.util.Version;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -202,7 +203,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
shardsToConsider = snapshotMeta.get().getShards();
}
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, request.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, request.getStr(Overseer.QUEUE_OPERATION));
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
Replica replica = null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 7b6858e..75126a9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -165,7 +165,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection alias already exists: " + collectionName);
}
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async, message.getStr(Overseer.QUEUE_OPERATION));
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
String withCollection = message.getStr(CollectionAdminParams.WITH_COLLECTION);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 3c6eee1..bb1d69b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cloud.api.collections;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
@@ -106,7 +107,7 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
final String asyncId = message.getStr(ASYNC);
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
final NamedList addResult = new NamedList();
AddReplicaCmd.Response resp;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index 0487785..ba3cf3e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -98,7 +99,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
Map<String, Slice> shardByCoreName = new HashMap<>();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
for (Replica replica : slice.getReplicas()) {
if (replica.getState() != State.ACTIVE) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 29dfd01..b3fefb7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -18,6 +18,7 @@
package org.apache.solr.cloud.api.collections;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.common.NonExistentCoreException;
import org.apache.solr.common.SolrException;
@@ -145,7 +146,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- shardRequestTracker = new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader, ocmh.shardHandlerFactory, ocmh.overseer);
+ shardRequestTracker = new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION), ocmh.adminPath, zkStateReader, ocmh.shardHandlerFactory, ocmh.overseer);
@SuppressWarnings({"unchecked"}) List<Replica> notLifeReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions, shardHandler, shardRequestTracker);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
index 2dfcc60..42056dc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteNodeCmd.java
@@ -22,6 +22,7 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -62,7 +63,7 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
results.add("failure", "Can't delete the only existing non-PULL replica(s) on node " + node + ": " + singleReplicas.toString());
} else {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(message.getStr("async"), message.getStr("operation"));
+ OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(message.getStr("async"), message.getStr(Overseer.QUEUE_OPERATION));
resp = cleanupReplicas(results, state, sourceReplicas, ocmh, node, message.getStr(ASYNC), shardHandler, shardRequestTracker);
AddReplicaCmd.Response response = new AddReplicaCmd.Response();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 3996fe9..4bd043c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.SliceMutator;
@@ -80,7 +81,7 @@ public class DeleteReplicaCmd implements Cmd {
if (!onlyUpdateState) {
String asyncId = message.getStr(ASYNC);
shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
createdShardHandler = true;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index e342bcd..0830ab9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -107,7 +107,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
List<OverseerCollectionMessageHandler.Finalize> finalizers = new ArrayList<>();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(message.getStr("async"), message.getStr("operation"));
+ OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(message.getStr("async"), message.getStr(Overseer.QUEUE_OPERATION));
try {
List<ZkNodeProps> replicas = getReplicasForSlice(collectionName, slice);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
index 912cf4c..99b9b06 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -101,7 +102,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
for (Replica replica : slice.getReplicas()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index bc62dd6..18d7983 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -188,7 +188,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates");
@@ -286,7 +286,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
String tempNodeName = sourceLeader.getNodeName();
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
shardRequestTracker.sendShardRequest(tempNodeName, params, shardHandler);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command");
}
@@ -339,7 +339,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
params = new ModifiableSolrParams(cmd.getParams());
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
shardRequestTracker.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
@@ -354,7 +354,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
@@ -367,7 +367,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
shardRequestTracker.sendShardRequest(targetLeader.getNodeName(), params, shardHandler);
shardRequestTracker.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates");
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index cc92226..56fdb00 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -802,7 +802,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
if (coll == null) return null;
List<Replica> notLivesReplicas = new ArrayList<>();
if (shardRequestTracker == null) {
- shardRequestTracker = new ShardRequestTracker(asyncId, message.getStr("operation"), adminPath, zkStateReader, shardHandlerFactory, overseer);
+ shardRequestTracker = new ShardRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION), adminPath, zkStateReader, shardHandlerFactory, overseer);
}
for (Slice slice : coll.getSlices()) {
notLivesReplicas.addAll(shardRequestTracker.sliceCmd(params, stateMatcher, slice, shardHandler));
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 8531923..f9d6b5b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -18,6 +18,7 @@
package org.apache.solr.cloud.api.collections;
import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -95,7 +96,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async, message.getStr("operation"));
+ OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(async, message.getStr(Overseer.QUEUE_OPERATION));
for (ZkNodeProps sourceReplica : sourceReplicas) {
@SuppressWarnings({"rawtypes"}) NamedList nl = new NamedList();
String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
@@ -200,7 +201,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
//now cleanup the replicas in the source node
try {
ShardHandler sh = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- OverseerCollectionMessageHandler.ShardRequestTracker srt = ocmh.asyncRequestTracker(message.getStr("async"), message.getStr("operation"));
+ OverseerCollectionMessageHandler.ShardRequestTracker srt = ocmh.asyncRequestTracker(message.getStr("async"), message.getStr(Overseer.QUEUE_OPERATION));
log.info("Cleanup replicas {}", sourceReplicas);
AddReplicaCmd.Response r = DeleteNodeCmd.cleanupReplicas(results, finalClusterState, sourceReplicas, ocmh, source, null, sh, srt);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 54c97b8..de2466d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -303,7 +303,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
{
- ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
// Copy data from backed up index to each replica
for (Slice slice : restoreCollection.getSlices()) {
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -317,7 +317,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
}
{
- ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
for (Slice s : restoreCollection.getSlices()) {
for (Replica r : s.getReplicas()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 6c378f2..812a9c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -370,7 +370,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
t.stop();
t = timings.sub("waitForSubSliceLeadersAlive");
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
for (String subShardName : subShardNames) {
// wait for parent leader to acknowledge the sub-shard core
log.info("Asking parent leader to wait for: {} to be alive on: {}", subShardName, nodeName);
@@ -415,7 +415,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
t = timings.sub("splitParentCore");
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
shardRequestTracker.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler);
String msgOnError = "SPLITSHARD failed to invoke SPLIT core admin command";
@@ -431,7 +431,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
t = timings.sub("applyBufferedUpdates");
// apply buffered updates on sub-shards
{
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr(Overseer.QUEUE_OPERATION));
for (int i = 0; i < subShardNames.size(); i++) {
String subShardName = subShardNames.get(i);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 37f33ce..efc6e98 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.StatePublisher;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
@@ -176,7 +177,7 @@ public class ZkStateWriter {
this.cs = clusterState;
} else {
- final String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ final String operation = message.getStr(StatePublisher.OPERATION);
OverseerAction overseerAction = OverseerAction.get(operation);
if (overseerAction == null) {
throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
@@ -184,8 +185,8 @@ public class ZkStateWriter {
switch (overseerAction) {
case STATE:
- log.info("state cmd {}", message);
- message.getProperties().remove("operation");
+ if (log.isDebugEnabled()) log.debug("state cmd {}", message);
+ message.getProperties().remove(StatePublisher.OPERATION);
for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
if (OverseerAction.DOWNNODE.equals(OverseerAction.get(entry.getKey()))) {
@@ -215,7 +216,7 @@ public class ZkStateWriter {
}
String collection = collectionAndState[0];
- String setState = collectionAndState[1];
+ String setState = Replica.State.shortStateToState(collectionAndState[1]).toString();
if (trackVersions.get(collection) == null) {
reader.forciblyRefreshClusterStateSlow(collection);
@@ -288,7 +289,7 @@ public class ZkStateWriter {
case UPDATESHARDSTATE:
String collection = message.getStr("collection");
message.getProperties().remove("collection");
- message.getProperties().remove("operation");
+ message.getProperties().remove(StatePublisher.OPERATION);
DocCollection docColl = cs.getCollectionOrNull(collection);
if (docColl != null) {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 03b348a..8d2ba45 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -911,7 +911,6 @@ public class CoreContainer implements Closeable {
log.error("Timeout", e);
}
}
- getZkController().getZkClient().printLayout();
}
for (final CoreDescriptor cd : cds) {
@@ -1279,10 +1278,14 @@ public class CoreContainer implements Closeable {
if (closeOld) {
if (old != null) {
SolrCore finalCore = old;
- Future<?> future = solrCoreExecutor.submit(() -> {
- log.info("Closing replaced core {}", cd.getName());
+ try {
+ Future<?> future = solrCoreExecutor.submit(() -> {
+ log.info("Closing replaced core {}", cd.getName());
+ finalCore.closeAndWait();
+ });
+ } catch (RejectedExecutionException e) {
finalCore.closeAndWait();
- });
+ }
}
}
return old;
@@ -1791,13 +1794,7 @@ public class CoreContainer implements Closeable {
}
newCore = core.reload(coreConfig);
- if (newCore == null) {
- return;
- }
-
if (isShutDown()) {
- IOUtils.closeQuietly(core);
- IOUtils.closeQuietly(newCore);
throw new AlreadyClosedException();
}
@@ -1851,7 +1848,7 @@ public class CoreContainer implements Closeable {
}
} catch (SolrCoreState.CoreIsClosedException e) {
-
+ log.error("Core is closed", e);
throw e;
} catch (Exception e) {
ParWork.propagateInterrupt("Exception reloading SolrCore", e);
@@ -1866,21 +1863,17 @@ public class CoreContainer implements Closeable {
throw exp;
} finally {
- if (isShutDown()) {
- IOUtils.closeQuietly(core);
- IOUtils.closeQuietly(oldCore);
- }
-
- if (!success) {
- log.error("Failed reloading core, cleaning up new core");
+ if (!success && newCore != null) {
+ log.warn("Failed reloading core, cleaning up new core");
SolrCore finalNewCore = newCore;
- solrCoreExecutor.submit(() -> {
- // try {
- if (finalNewCore != null) {
+ try {
+ solrCoreExecutor.submit(() -> {
log.error("Closing failed new core");
finalNewCore.closeAndWait();
- }
- });
+ });
+ } catch (RejectedExecutionException e) {
+ finalNewCore.closeAndWait();
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 40fcfb5..28ff855 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -172,6 +172,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -720,16 +721,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
boolean locked = lock.tryLock();
try {
- if (!locked && reloadyWaiting.get() > 1) {
- log.info("Skipping recovery because there is another already queued");
- lock.lockInterruptibly();
- lock.unlock();
- return null;
- }
- // if (closed || prepForClose) {
- // return;
- // }
if (!locked) {
reloadyWaiting.incrementAndGet();
log.info("Wait for reload lock");
@@ -741,16 +733,6 @@ public final class SolrCore implements SolrInfoBean, Closeable {
throw new AlreadyClosedException();
}
}
- // don't use recoveryLock.getQueueLength() for this
- if (reloadyWaiting.decrementAndGet() > 0) {
- // another recovery waiting behind us, let it run now instead of after we finish
- log.info("Skipping reload because there is another in line behind");
- lock.unlock();
- Thread.sleep(10);
- lock.lock();
- lock.unlock();
- return null;
- }
}
final SolrCore currentCore;
@@ -766,7 +748,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
try {
CoreDescriptor cd = getCoreDescriptor();
cd.loadExtraProperties(); //Reload the extra properties
- coreMetricManager.close();
+ // coreMetricManager.close();
if (coreContainer.isShutDown()) {
throw new AlreadyClosedException();
}
@@ -774,7 +756,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
core.start();
// we open a new IndexWriter to pick up the latest config
core.getUpdateHandler().getSolrCoreState().newIndexWriter(core, false);
- core.getSearcher(true, false, null, true);
+ // core.getSearcher(true, false, null, true);
success = true;
return core;
} finally {
@@ -782,14 +764,18 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (!success) {
if (core != null) {
SolrCore finalCore = core;
-// coreContainer.solrCoreCloseExecutor.submit(() -> {
-// try {
-// log.error("Closing failed SolrCore from failed reload");
- finalCore.closeAndWait();
-// } catch (Exception e) {
-// log.error("Exception waiting for core to close on reload failure", e);
-// }
-// });
+ try {
+ coreContainer.solrCoreExecutor.submit(() -> {
+ try {
+ log.warn("Closing failed SolrCore from failed reload");
+ finalCore.closeAndWait();
+ } catch (Exception e) {
+ log.error("Exception waiting for core to close on reload failure", e);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ finalCore.closeAndWait();
+ }
}
}
}
@@ -1725,9 +1711,6 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
int count = refCount.decrementAndGet();
- synchronized (closeAndWait) {
- closeAndWait.notifyAll();
- }
// if (log.isDebugEnabled()) {
// RuntimeException e = new RuntimeException();
@@ -1739,12 +1722,23 @@ public final class SolrCore implements SolrInfoBean, Closeable {
// log.debug("close refcount after {} {}", this, count);
// }
- if (count == 0 || coreContainer.solrCores.isClosed() && count == 1) {
+ if (count == 0) {
try {
- doClose();
- } catch (Exception e1) {
- log.error("Exception waiting for core to close", e1);
+ coreContainer.solrCoreExecutor.submit(() -> {
+ try {
+ doClose();
+ } catch (Exception e1) {
+ log.error("Exception closing SolrCore", e1);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ try {
+ doClose();
+ } catch (Exception e1) {
+ log.error("Exception closing SolrCore", e1);
+ }
}
+
return;
}
@@ -1775,7 +1769,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (cnt >= 2 && !closing) {
close();
}
- log.warn("close count is {} {} closing={} isClosed={}", name, refCount.get(), closing, isClosed);
+ if (log.isTraceEnabled()) log.trace("close count is {} {} closing={} isClosed={}", name, refCount.get(), closing, isClosed);
} catch (InterruptedException e) {
}
@@ -1787,29 +1781,18 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
void doClose() {
- ReentrantLock reloadLock = null;
- try {
- if (getUpdateHandler() != null && getUpdateHandler().getSolrCoreState() != null) {
- reloadLock = getUpdateHandler().getSolrCoreState().getReloadLock();
- try {
- reloadLock.lockInterruptibly();
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new AlreadyClosedException(e);
- }
- }
+ try {
if (closing) {
- while (refCount.get() != -1) {
+ while (!isClosed) {
synchronized (closeAndWait) {
try {
- closeAndWait.wait(250);
+ closeAndWait.wait(500);
} catch (InterruptedException e) {
}
}
}
-
return;
}
@@ -1831,8 +1814,6 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
}
- this.isClosed = true;
-
if (coreContainer.isZooKeeperAware()) {
coreContainer.getZkController().removeShardLeaderElector(name);
}
@@ -1995,8 +1976,8 @@ public final class SolrCore implements SolrInfoBean, Closeable {
}
} finally {
if (log.isDebugEnabled()) log.debug("close done refcount {} {}", refCount == null ? null : refCount.get(), name);
+ this.isClosed = true;
refCount.set(-1);
- if (reloadLock != null && reloadLock.isHeldByCurrentThread()) reloadLock.unlock();
infoRegistry.clear();
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index 660b40e..089f2f0 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
@@ -115,18 +116,22 @@ class SolrCores implements Closeable {
}
cores.forEach((s, solrCore) -> {
- container.solrCoreExecutor.submit(() -> {
- MDCLoggingContext.setCoreName(solrCore.getName());
- try {
- solrCore.closeAndWait();
- } catch (Throwable e) {
- log.error("Error closing SolrCore", e);
- ParWork.propagateInterrupt("Error shutting down core", e);
- } finally {
- MDCLoggingContext.clear();
- }
- return solrCore;
- });
+ try {
+ container.solrCoreExecutor.submit(() -> {
+ MDCLoggingContext.setCoreName(solrCore.getName());
+ try {
+ solrCore.closeAndWait();
+ } catch (Throwable e) {
+ log.error("Error closing SolrCore", e);
+ ParWork.propagateInterrupt("Error shutting down core", e);
+ } finally {
+ MDCLoggingContext.clear();
+ }
+ return solrCore;
+ });
+ } catch (RejectedExecutionException e) {
+ solrCore.closeAndWait();
+ }
});
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
index dc57a1c..f866a15 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java
@@ -113,7 +113,7 @@ public class SolrDispatchFilter extends BaseSolrFilter {
log.warn("expected pre init of factories {} {} {} {} {} {} {}",
FieldTypeXmlAdapter.dbf, XMLResponseParser.inputFactory, XMLResponseParser.saxFactory,
AnnotatedApi.MAPPER, org.apache.http.conn.util.PublicSuffixMatcherLoader.getDefault(),
- ValueSourceParser.standardValueSourceParsers);
+ ValueSourceParser.standardValueSourceParsers.getClass().getSimpleName());
}
private volatile StopRunnable stopRunnable;
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
index f9ac88d..eeb60f1 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
@@ -88,67 +88,67 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
//test a simple create collection call
compareOutput(apiBag, "/collections", POST,
"{create:{name:'newcoll', config:'schemaless', numShards:2, replicationFactor:2 }}", null,
- "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create}");
+ "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', op:create}");
compareOutput(apiBag, "/collections", POST,
"{create:{name:'newcoll', config:'schemaless', numShards:2, nrtReplicas:2 }}", null,
- "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', collection.configName:schemaless, numShards:'2', operation:create}");
+ "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', collection.configName:schemaless, numShards:'2', op:create}");
compareOutput(apiBag, "/collections", POST,
"{create:{name:'newcoll', config:'schemaless', numShards:2, nrtReplicas:2, tlogReplicas:2, pullReplicas:2 }}", null,
- "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', tlogReplicas:'2', pullReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create}");
+ "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', tlogReplicas:'2', pullReplicas:'2', collection.configName:schemaless, numShards:'2', op:create}");
//test a create collection with custom properties
compareOutput(apiBag, "/collections", POST,
"{create:{name:'newcoll', config:'schemaless', numShards:2, replicationFactor:2, properties:{prop1:'prop1val', prop2: prop2val} }}", null,
- "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create, property.prop1:prop1val, property.prop2:prop2val}");
+ "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', op:create, property.prop1:prop1val, property.prop2:prop2val}");
// nocommit
// compareOutput(apiBag, "/collections", POST,
-// "{create-alias:{name: aliasName , collections:[c1,c2] }}", null, "{operation : createalias, name: aliasName, collections:[c1,c2] }");
+// "{create-alias:{name: aliasName , collections:[c1,c2] }}", null, "{op : createalias, name: aliasName, collections:[c1,c2] }");
compareOutput(apiBag, "/collections", POST,
- "{delete-alias:{ name: aliasName}}", null, "{operation : deletealias, name: aliasName}");
+ "{delete-alias:{ name: aliasName}}", null, "{op : deletealias, name: aliasName}");
compareOutput(apiBag, "/collections/collName", POST,
"{reload:{}}", null,
- "{name:collName, operation :reload}");
+ "{name:collName, op :reload}");
compareOutput(apiBag, "/collections/collName", DELETE,
null, null,
- "{name:collName, operation :delete}");
+ "{name:collName, op :delete}");
compareOutput(apiBag, "/collections/collName/shards/shard1", DELETE,
null, null,
- "{collection:collName, shard: shard1 , operation :deleteshard }");
+ "{collection:collName, shard: shard1 , op :deleteshard }");
compareOutput(apiBag, "/collections/collName/shards/shard1/replica1?deleteDataDir=true&onlyIfDown=true", DELETE,
null, null,
- "{collection:collName, shard: shard1, replica :replica1 , deleteDataDir:'true', onlyIfDown: 'true', operation :deletereplica }");
+ "{collection:collName, shard: shard1, replica :replica1 , deleteDataDir:'true', onlyIfDown: 'true', op :deletereplica }");
compareOutput(apiBag, "/collections/collName/shards", POST,
"{split:{shard:shard1, ranges: '0-1f4,1f5-3e8,3e9-5dc', coreProperties : {prop1:prop1Val, prop2:prop2Val} }}", null,
- "{collection: collName , shard : shard1, ranges :'0-1f4,1f5-3e8,3e9-5dc', operation : splitshard, property.prop1:prop1Val, property.prop2: prop2Val}"
+ "{collection: collName , shard : shard1, ranges :'0-1f4,1f5-3e8,3e9-5dc', op : splitshard, property.prop1:prop1Val, property.prop2: prop2Val}"
);
compareOutput(apiBag, "/collections/collName/shards", POST,
"{add-replica:{shard: shard1, node: 'localhost_8978' , coreProperties : {prop1:prop1Val, prop2:prop2Val} }}", null,
- "{collection: collName , shard : shard1, node :'localhost_8978', operation : addreplica, property.prop1:prop1Val, property.prop2: prop2Val}"
+ "{collection: collName , shard : shard1, node :'localhost_8978', op : addreplica, property.prop1:prop1Val, property.prop2: prop2Val}"
);
compareOutput(apiBag, "/collections/collName/shards", POST,
"{split:{ splitKey:id12345, coreProperties : {prop1:prop1Val, prop2:prop2Val} }}", null,
- "{collection: collName , split.key : id12345 , operation : splitshard, property.prop1:prop1Val, property.prop2: prop2Val}"
+ "{collection: collName , split.key : id12345 , op : splitshard, property.prop1:prop1Val, property.prop2: prop2Val}"
);
compareOutput(apiBag, "/collections/collName/shards", POST,
"{add-replica:{shard: shard1, node: 'localhost_8978' , type:'TLOG' }}", null,
- "{collection: collName , shard : shard1, node :'localhost_8978', operation : addreplica, type: TLOG}"
+ "{collection: collName , shard : shard1, node :'localhost_8978', op : addreplica, type: TLOG}"
);
compareOutput(apiBag, "/collections/collName/shards", POST,
"{add-replica:{shard: shard1, node: 'localhost_8978' , type:'PULL' }}", null,
- "{collection: collName , shard : shard1, node :'localhost_8978', operation : addreplica, type: PULL}"
+ "{collection: collName , shard : shard1, node :'localhost_8978', op : addreplica, type: PULL}"
);
assertErrorContains(apiBag, "/collections/collName/shards", POST,
@@ -158,36 +158,36 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
compareOutput(apiBag, "/collections/collName", POST,
"{add-replica-property : {name:propA , value: VALA, shard: shard1, replica:replica1}}", null,
- "{collection: collName, shard: shard1, replica : replica1 , property : propA , operation : addreplicaprop, property.value : 'VALA'}"
+ "{collection: collName, shard: shard1, replica : replica1 , property : propA , op : addreplicaprop, property.value : 'VALA'}"
);
compareOutput(apiBag, "/collections/collName", POST,
"{delete-replica-property : {property: propA , shard: shard1, replica:replica1} }", null,
- "{collection: collName, shard: shard1, replica : replica1 , property : propA , operation : deletereplicaprop}"
+ "{collection: collName, shard: shard1, replica : replica1 , property : propA , op : deletereplicaprop}"
);
compareOutput(apiBag, "/cluster", POST,
"{add-role : {role : overseer, node : 'localhost_8978'} }", null,
- "{operation : addrole ,role : overseer, node : 'localhost_8978'}"
+ "{op : addrole ,role : overseer, node : 'localhost_8978'}"
);
compareOutput(apiBag, "/cluster", POST,
"{remove-role : {role : overseer, node : 'localhost_8978'} }", null,
- "{operation : removerole ,role : overseer, node : 'localhost_8978'}"
+ "{op : removerole ,role : overseer, node : 'localhost_8978'}"
);
compareOutput(apiBag, "/collections/coll1", POST,
"{balance-shard-unique : {property: preferredLeader} }", null,
- "{operation : balanceshardunique ,collection : coll1, property : preferredLeader}"
+ "{op : balanceshardunique ,collection : coll1, property : preferredLeader}"
);
compareOutput(apiBag, "/collections/coll1", POST,
"{migrate-docs : {forwardTimeout: 1800, target: coll2, splitKey: 'a123!'} }", null,
- "{operation : migrate ,collection : coll1, target.collection:coll2, forward.timeout:1800, split.key:'a123!'}"
+ "{op : migrate ,collection : coll1, target.collection:coll2, forward.timeout:1800, split.key:'a123!'}"
);
compareOutput(apiBag, "/collections/coll1", POST,
"{set-collection-property : {name: 'foo', value:'bar'} }", null,
- "{operation : collectionprop, name : coll1, propertyName:'foo', propertyValue:'bar'}"
+ "{op : collectionprop, name : coll1, propertyName:'foo', propertyValue:'bar'}"
);
}
@@ -279,7 +279,7 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
void invokeAction(SolrQueryRequest req, SolrQueryResponse rsp,
CoreContainer cores,
CollectionParams.CollectionAction action,
- CollectionOperation operation) throws Exception {
+ CollectionOperation op) throws Exception {
Map<String, Object> result = null;
if (action == CollectionParams.CollectionAction.COLLECTIONPROP) {
//Fake this action, since we don't want to write to ZooKeeper in this test
@@ -288,10 +288,10 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
result.put(PROPERTY_NAME, req.getParams().required().get(PROPERTY_NAME));
result.put(PROPERTY_VALUE, req.getParams().required().get(PROPERTY_VALUE));
} else {
- result = operation.execute(req, rsp, this);
+ result = op.execute(req, rsp, this);
}
if (result != null) {
- result.put(QUEUE_OPERATION, operation.action.toLower());
+ result.put(QUEUE_OPERATION, op.action.toLower());
rsp.add(ZkNodeProps.class.getName(), new ZkNodeProps(result));
}
}
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/TestConfigsApi.java b/solr/core/src/test/org/apache/solr/handler/admin/TestConfigsApi.java
index 8360aef..c727ba7 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/TestConfigsApi.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/TestConfigsApi.java
@@ -51,10 +51,10 @@ public class TestConfigsApi extends SolrTestCaseJ4 {
ApiBag apiBag = new ApiBag(false);
for (Api api : handler.getApis()) apiBag.register(api, EMPTY_MAP);
compareOutput(apiBag, "/cluster/configs/sample", DELETE, null, null,
- "{name :sample, operation:delete}");
+ "{name :sample, op:delete}");
compareOutput(apiBag, "/cluster/configs", POST, "{create:{name : newconf, baseConfigSet: sample }}", null,
- "{operation:create, name :newconf, baseConfigSet: sample, immutable: false }");
+ "{op:create, name :newconf, baseConfigSet: sample, immutable: false }");
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 47d7608..e7f8670 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -42,6 +42,8 @@ public class Replica extends ZkNodeProps {
* </p>
*/
ACTIVE,
+
+ LEADER,
/**
* The first state before {@link State#RECOVERING}. A node in this state
@@ -87,6 +89,8 @@ public class Replica extends ZkNodeProps {
public static State shortStateToState(String shortState) {
if (shortState.equals("a")) {
return State.ACTIVE;
+ } if (shortState.equals("l")) {
+ return State.LEADER;
} else if (shortState.equals("r")) {
return State.RECOVERING;
} else if (shortState.equals("b")) {