You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/29 13:41:29 UTC
[lucene-solr] branch reference_impl_dev updated: @1066 Harden.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 637e647 @1066 Harden.
637e647 is described below
commit 637e64719665e7c47efeb8867fb35f2d8c94221a
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Oct 29 08:40:51 2020 -0500
@1066 Harden.
---
.../analytics/legacy/LegacyNoFacetCloudTest.java | 2 +
.../src/java/org/apache/solr/cloud/Overseer.java | 43 ++++++++++++++++------
.../OverseerCollectionConfigSetProcessor.java | 26 +++++++++----
.../apache/solr/cloud/OverseerTaskProcessor.java | 6 ++-
.../java/org/apache/solr/cloud/ZkController.java | 11 +++---
.../solr/cloud/api/collections/AddReplicaCmd.java | 3 +-
.../solr/cloud/api/collections/BackupCmd.java | 2 +-
.../cloud/api/collections/CreateCollectionCmd.java | 2 +-
.../cloud/api/collections/CreateSnapshotCmd.java | 2 +-
.../cloud/api/collections/DeleteReplicaCmd.java | 2 +-
.../cloud/api/collections/DeleteSnapshotCmd.java | 2 +-
.../solr/cloud/api/collections/MigrateCmd.java | 3 +-
.../OverseerCollectionMessageHandler.java | 21 +++++++----
.../solr/cloud/api/collections/RestoreCmd.java | 2 +-
.../solr/cloud/api/collections/SplitShardCmd.java | 6 +--
.../solr/handler/component/HttpShardHandler.java | 14 +++----
.../org/apache/solr/schema/ManagedIndexSchema.java | 8 ++--
.../org/apache/solr/update/UpdateShardHandler.java | 15 +-------
.../solr/cloud/ChaosMonkeyShardSplitTest.java | 2 +-
.../org/apache/solr/cloud/DeleteStatusTest.java | 2 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 4 +-
.../solr/cloud/TestShortCircuitedRequests.java | 1 -
.../org/apache/solr/common/cloud/SolrZkClient.java | 7 ++--
.../apache/solr/common/cloud/ZkStateReader.java | 8 ++++
.../solr/common/util/SolrQueuedThreadPool.java | 2 +-
.../solrj/impl/ConcurrentUpdateSolrClientTest.java | 4 +-
.../component/TrackingShardHandlerFactory.java | 43 ++++++++++++++++++++++
27 files changed, 156 insertions(+), 87 deletions(-)
diff --git a/solr/contrib/analytics/src/test/org/apache/solr/analytics/legacy/LegacyNoFacetCloudTest.java b/solr/contrib/analytics/src/test/org/apache/solr/analytics/legacy/LegacyNoFacetCloudTest.java
index dbc9522..43a0210 100644
--- a/solr/contrib/analytics/src/test/org/apache/solr/analytics/legacy/LegacyNoFacetCloudTest.java
+++ b/solr/contrib/analytics/src/test/org/apache/solr/analytics/legacy/LegacyNoFacetCloudTest.java
@@ -19,11 +19,13 @@ package org.apache.solr.analytics.legacy;
import java.util.ArrayList;
import java.util.List;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.util.NamedList;
import org.junit.BeforeClass;
import org.junit.Test;
+@LuceneTestCase.Nightly
public class LegacyNoFacetCloudTest extends LegacyAbstractAnalyticsCloudTest {
static public final int INT = 71;
static public final int LONG = 36;
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 287cd6b..c018ebb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -36,6 +36,8 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.api.collections.CreateCollectionCmd;
@@ -645,8 +647,6 @@ public class Overseer implements SolrCloseable {
private final ZkStateReader reader;
- private final HttpShardHandler shardHandler;
-
private final UpdateShardHandler updateShardHandler;
private final String adminPath;
@@ -662,12 +662,13 @@ public class Overseer implements SolrCloseable {
private final CloudConfig config;
+ public volatile Http2SolrClient overseerOnlyClient;
+ public volatile LBHttp2SolrClient overseerLbClient;
+
// overseer not responsible for closing reader
- public Overseer(HttpShardHandler shardHandler,
- UpdateShardHandler updateShardHandler, String adminPath,
+ public Overseer(UpdateShardHandler updateShardHandler, String adminPath,
final ZkStateReader reader, ZkController zkController, CloudConfig config) {
this.reader = reader;
- this.shardHandler = shardHandler;
this.updateShardHandler = updateShardHandler;
this.adminPath = adminPath;
this.zkController = zkController;
@@ -696,8 +697,14 @@ public class Overseer implements SolrCloseable {
// } catch (Exception e) {
// log.error("", e);
// }
+ Http2SolrClient.Builder overseerOnlyClientBuilder = new Http2SolrClient.Builder();
+ overseerOnlyClientBuilder = overseerOnlyClientBuilder.connectionTimeout(15000).idleTimeout(500000);
+ overseerOnlyClient = overseerOnlyClientBuilder.markInternalRequest().build();
+ overseerOnlyClient.enableCloseLock();
+ this.overseerLbClient = new LBHttp2SolrClient(overseerOnlyClient);
+
try {
if (log.isDebugEnabled()) {
log.debug("set watch on leader znode");
@@ -736,7 +743,7 @@ public class Overseer implements SolrCloseable {
// nocommit - I don't know about this guy..
OverseerNodePrioritizer overseerPrioritizer = null; // new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory(), updateShardHandler.getUpdateOnlyHttpClient());
- overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(zkController.getCoreContainer(), reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
+ overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(zkController.getCoreContainer(), reader, id, overseerLbClient, adminPath, stats, Overseer.this, overseerPrioritizer);
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
ccThread.setDaemon(true);
@@ -936,17 +943,31 @@ public class Overseer implements SolrCloseable {
if (ccThread != null) {
((OverseerCollectionConfigSetProcessor) ccThread.getThread()).closing();
+ }
+
+ if (overseerLbClient != null) {
+ overseerLbClient.close();
+ overseerLbClient = null;
+ }
+
+ if (overseerOnlyClient != null) {
+ overseerOnlyClient.disableCloseLock();
+ overseerOnlyClient.close();
+ overseerLbClient = null;
+ }
+
+ if (ccThread != null) {
ccThread.interrupt();
((OverseerCollectionConfigSetProcessor) ccThread.getThread()).close(closeAndDone);
}
if (updaterThread != null) {
updaterThread.interrupt();
- IOUtils.closeQuietly(updaterThread);
- }
-
- if (closeAndDone) {
- shardHandler.cancelAll();
+ try {
+ updaterThread.getThread().close();
+ } catch (Exception e) {
+ log.warn("", e);
+ }
}
if (ccThread != null) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
index df57888..8b2e1c7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
@@ -19,14 +19,20 @@ package org.apache.solr.cloud;
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
import java.io.IOException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
+import org.apache.solr.common.ParWork;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.HttpShardHandler;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
+import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.zookeeper.KeeperException;
/**
@@ -36,16 +42,15 @@ import org.apache.zookeeper.KeeperException;
*/
public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor {
- public OverseerCollectionConfigSetProcessor(CoreContainer cc, ZkStateReader zkStateReader, String myId,
- final HttpShardHandler shardHandler,
+ public OverseerCollectionConfigSetProcessor(CoreContainer cc, ZkStateReader zkStateReader, String myId, LBHttp2SolrClient overseerLbClient,
String adminPath, Stats stats, Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer) throws KeeperException {
this(cc,
zkStateReader,
myId,
- (HttpShardHandlerFactory) shardHandler.getShardHandlerFactory(),
adminPath,
stats,
+ overseerLbClient, (HttpShardHandlerFactory) cc.getShardHandlerFactory(),
overseer,
overseerNodePrioritizer,
overseer.getCollectionQueue(zkStateReader.getZkClient(), stats),
@@ -56,9 +61,10 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
}
protected OverseerCollectionConfigSetProcessor(CoreContainer cc, ZkStateReader zkStateReader, String myId,
- final HttpShardHandlerFactory shardHandlerFactory,
String adminPath,
Stats stats,
+ LBHttp2SolrClient overseerLbClient,
+ HttpShardHandlerFactory shardHandlerFactory,
Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer,
OverseerTaskQueue workQueue,
@@ -69,7 +75,7 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
cc,
myId,
stats,
- getOverseerMessageHandlerSelector(zkStateReader, myId, shardHandlerFactory,
+ getOverseerMessageHandlerSelector(zkStateReader, myId, overseerLbClient, shardHandlerFactory,
adminPath, stats, overseer, overseerNodePrioritizer),
overseerNodePrioritizer,
workQueue,
@@ -81,13 +87,14 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
private static OverseerMessageHandlerSelector getOverseerMessageHandlerSelector(
ZkStateReader zkStateReader,
String myId,
- final HttpShardHandlerFactory shardHandlerFactory,
+ LBHttp2SolrClient overseerLbClient,
+ HttpShardHandlerFactory shardHandlerFactory,
String adminPath,
Stats stats,
Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer) {
final OverseerCollectionMessageHandler collMessageHandler = new OverseerCollectionMessageHandler(
- zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer);
+ zkStateReader, myId, overseerLbClient, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer);
final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler(
zkStateReader);
return new OverseerMessageHandlerSelector() {
@@ -107,4 +114,9 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
}
};
}
+
+ @Override
+ public void close(boolean closeAndDone) {
+
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 066d3aa..6f8b7b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -364,6 +364,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.debug("close() - start");
}
isClosed = true;
+
+
+ IOUtils.closeQuietly(selector);
+
+
if (closeAndDone) {
for (Future future : taskFutures.values()) {
future.cancel(false);
@@ -380,7 +385,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
}
- IOUtils.closeQuietly(selector);
}
public static List<String> getSortedOverseerNodeNames(SolrZkClient zk) throws KeeperException, InterruptedException {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 89c0aae..5b0976a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -139,7 +139,7 @@ public class ZkController implements Closeable, Runnable {
private final int zkClientConnectTimeout;
private final Supplier<List<CoreDescriptor>> descriptorsSupplier;
private final ZkACLProvider zkACLProvider;
- private final LBHttp2SolrClient overseerLbClient;
+
private CloseTracker closeTracker;
private boolean closeZkClient = false;
@@ -335,7 +335,7 @@ public class ZkController implements Closeable, Runnable {
this.zkClientConnectTimeout = zkClient.getZkClientTimeout();
this.genericCoreNodeNames = cloudConfig.getGenericCoreNodeNames();
this.zkClient = zkClient;
- this.overseerLbClient = new LBHttp2SolrClient(cc.getUpdateShardHandler().getOverseerOnlyClient());
+
// be forgiving and strip this off leading/trailing slashes
// this allows us to support users specifying hostContext="/" in
// solr.xml to indicate the root context, instead of hostContext=""
@@ -466,7 +466,7 @@ public class ZkController implements Closeable, Runnable {
ParWork.close(overseerElector.getContext());
}
overseerElector = new LeaderElector(zkClient, new ContextKey("overseer", "overseer"), overseerContexts);
- ZkController.this.overseer = new Overseer((HttpShardHandler) ((HttpShardHandlerFactory) cc.getShardHandlerFactory()).getShardHandler(overseerLbClient), cc.getUpdateShardHandler(),
+ ZkController.this.overseer = new Overseer(cc.getUpdateShardHandler(),
CommonParams.CORES_HANDLER_PATH, zkStateReader, ZkController.this, cloudConfig);
overseerElector.setup(context);
overseerElector.joinElection(context, true);
@@ -652,14 +652,13 @@ public class ZkController implements Closeable, Runnable {
this.isClosed = true;
- try (ParWork closer = new ParWork(this, true)) {
+ try (ParWork closer = new ParWork(this, true, true)) {
closer.collect(replicateFromLeaders);
closer.collect(electionContexts);
closer.collect(collectionToTerms);
closer.collect(sysPropsCacher);
closer.collect(cloudManager);
closer.collect(cloudSolrClient);
- closer.collect(overseerLbClient);
}
try {
@@ -1127,7 +1126,7 @@ public class ZkController implements Closeable, Runnable {
zkStateReader.createClusterStateWatchersAndUpdate();
- this.overseer = new Overseer((HttpShardHandler) cc.getShardHandlerFactory().getShardHandler(overseerLbClient), cc.getUpdateShardHandler(),
+ this.overseer = new Overseer(cc.getUpdateShardHandler(),
CommonParams.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 71babbd..b49e6cf 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -30,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -179,7 +178,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
throw new IllegalStateException("Did not get enough positions to cover new replicas");
}
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
ZkStateReader zkStateReader = ocmh.zkStateReader;
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index 6ff3797..c0f4236 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -170,7 +170,7 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
String backupName = request.getStr(NAME);
String asyncId = request.getStr(ASYNC);
String repoName = request.getStr(CoreAdminParams.BACKUP_REPOSITORY);
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.overseerLbClient);
String commitName = request.getStr(CoreAdminParams.COMMIT_NAME);
Optional<CollectionSnapshotMetaData> snapshotMeta = Optional.empty();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 8256979..cf40fba 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -237,7 +237,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}", collectionName, shardNames, message));
}
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
for (ReplicaPosition replicaPosition : replicaPositions) {
String nodeName = replicaPosition.node;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
index e3d8ab5..26a6091 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateSnapshotCmd.java
@@ -96,7 +96,7 @@ public class CreateSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
@SuppressWarnings({"rawtypes"})
NamedList shardRequestResults = new NamedList();
Map<String, Slice> shardByCoreName = new HashMap<>();
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId);
for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 7f4ff86..7ab8ccc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -233,7 +233,7 @@ public class DeleteReplicaCmd implements Cmd {
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
}
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String asyncId = message.getStr(ASYNC);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
index 9e4388b..91c15bb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -77,7 +77,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
String asyncId = message.getStr(ASYNC);
@SuppressWarnings({"rawtypes"})
NamedList shardRequestResults = new NamedList();
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.overseerLbClient);
SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index 1c0fcbd..0b95097 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -161,8 +161,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
- ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
// intersect source range, keyHashRange and target range
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index dfe9a8e..46b7fd8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -43,6 +44,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
@@ -74,14 +76,13 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.HttpShardHandler;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardRequest;
@@ -92,6 +93,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
+import org.eclipse.jetty.util.BlockingArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -151,6 +153,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String FAILURE_FIELD = "failure";
public static final String SUCCESS_FIELD = "success";
+ final LBHttp2SolrClient overseerLbClient;
Overseer overseer;
HttpShardHandlerFactory shardHandlerFactory;
@@ -165,7 +168,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
// This is used for handling mutual exclusion of the tasks.
final private LockTree lockTree = new LockTree();
- ExecutorService tpe = new PerThreadExecService(ParWork.getRootSharedExecutor(), 15, true, true);
+ ExecutorService tpe = ParWork.getParExecutorService("overseerTPE", 0, 16, 0, new BlockingArrayQueue());
public static final Random RANDOM;
static {
@@ -184,7 +187,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private volatile boolean isClosed;
public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
- final HttpShardHandlerFactory shardHandlerFactory,
+ LBHttp2SolrClient overseerLbClient,
+ HttpShardHandlerFactory shardHandlerFactory,
String adminPath,
Stats stats,
Overseer overseer,
@@ -193,6 +197,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
// assert ObjectReleaseTracker.track(this);
this.zkStateReader = zkStateReader;
this.shardHandlerFactory = shardHandlerFactory;
+ this.overseerLbClient = overseerLbClient;
this.adminPath = adminPath;
this.myId = myId;
this.stats = stats;
@@ -335,7 +340,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
sreq.shards = new String[] {baseUrl};
sreq.actualShards = sreq.shards;
sreq.params = params;
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
shardHandler.submit(sreq, baseUrl, sreq.params);
}
@@ -715,7 +720,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
@SuppressWarnings("deprecation")
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collectionName);
@@ -776,7 +781,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private static NamedList<Object> waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId, String adminPath, ZkStateReader zkStateReader, HttpShardHandlerFactory shardHandlerFactory,
Overseer overseer) throws KeeperException, InterruptedException {
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.overseerLbClient);
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
params.set(CoreAdminParams.REQUESTID, requestId);
@@ -926,7 +931,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
} finally {
if (tpe != null) {
if (!tpe.isShutdown()) {
- tpe.shutdown();
+ tpe.shutdownNow();
try {
tpe.awaitTermination(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 3d99a9e..d3c14e9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -92,7 +92,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
String restoreCollectionName = message.getStr(COLLECTION_PROP);
String backupName = message.getStr(NAME); // of backup
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
String asyncId = message.getStr(ASYNC);
String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index 4b5d65b..e907bae 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -27,16 +27,13 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.VersionedData;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
@@ -66,7 +63,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.util.RTimerTree;
import org.apache.solr.util.TestInjection;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,7 +193,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
@SuppressWarnings("deprecation")
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
if (message.getBool(CommonAdminParams.SPLIT_BY_PREFIX, false)) {
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index b92fd69..d89d02b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.util.AsyncListener;
import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.SolrSingleThreaded;
import org.apache.solr.common.cloud.Replica;
@@ -248,34 +249,29 @@ public class HttpShardHandler extends ShardHandler {
try {
while (pending.get() > 0) {
if (httpShardHandlerFactory.isClosed()) {
- return null;
+ throw new AlreadyClosedException();
}
- ShardResponse rsp = responses.take();
+ ShardResponse rsp = responses.poll(3, TimeUnit.SECONDS);
if (rsp == null) {
- break;
+ continue;
}
responseCancellableMap.remove(rsp);
pending.decrementAndGet();
- if (bailOnError && rsp.getException() != null) {
- responseCancellableMap.clear();
- return rsp; // if exception, return immediately
- }
+ if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
// for a request was received. Otherwise we might return the same
// request more than once.
rsp.getShardRequest().responses.add(rsp);
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
- responseCancellableMap.clear();
return rsp;
}
}
} catch (InterruptedException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
- responseCancellableMap.clear();
return null;
}
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
index 8e042a0..c740071 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
@@ -272,7 +272,8 @@ public final class ManagedIndexSchema extends IndexSchema {
// get a list of active replica cores to query for the schema zk version (skipping this core of course)
List<GetZkSchemaVersionCallable> concurrentTasks = new ArrayList<>();
for (String coreUrl : getActiveReplicaCoreUrls(zkController, collection, localCoreNodeName))
- concurrentTasks.add(new GetZkSchemaVersionCallable(coreUrl, schemaZkVersion, zkController.getCoreContainer().getUpdateShardHandler().getOverseerOnlyClient(), isClosed));
+ // nocommit - make a general http2 client that is not also for updates, for now we use recovery client
+ concurrentTasks.add(new GetZkSchemaVersionCallable(coreUrl, schemaZkVersion, zkController.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient(), isClosed));
if (concurrentTasks.isEmpty()) return; // nothing to wait for ...
if (log.isInfoEnabled()) {
@@ -393,9 +394,8 @@ public final class ManagedIndexSchema extends IndexSchema {
if (isClosed.isClosed()) {
return -1;
}
- // rather than waiting and re-polling, let's be proactive and tell the replica
- // to refresh its schema from ZooKeeper, if that fails, then the
- Thread.sleep(10); // slight delay before requesting version again
+
+ Thread.sleep(50); // slight delay before requesting version again
log.info("Replica {} returned schema version {} and has not applied schema version {}"
, coreUrl, remoteVersion, expectedZkVersion);
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 6e69839..a175732 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -62,9 +62,6 @@ public class UpdateShardHandler implements SolrInfoBean {
private final Http2SolrClient recoveryOnlyClient;
- private final Http2SolrClient overseerOnlyClient;
-
-
private final CloseableHttpClient defaultClient;
private ExecutorService recoveryExecutor;
@@ -136,12 +133,6 @@ public class UpdateShardHandler implements SolrInfoBean {
searchOnlyClient = recoveryOnlyClientBuilder.markInternalRequest().build();
searchOnlyClient.enableCloseLock();
- Http2SolrClient.Builder overseerOnlyClientBuilder = new Http2SolrClient.Builder();
- overseerOnlyClientBuilder = overseerOnlyClientBuilder.connectionTimeout(5000).idleTimeout(500000);
-
-
- overseerOnlyClient = overseerOnlyClientBuilder.markInternalRequest().build();
- overseerOnlyClient.enableCloseLock();
// ThreadFactory recoveryThreadFactory = new SolrNamedThreadFactory("recoveryExecutor");
// if (cfg != null && cfg.getMaxRecoveryThreads() > 0) {
@@ -228,9 +219,6 @@ public class UpdateShardHandler implements SolrInfoBean {
public Http2SolrClient getSearchOnlyClient() {
return searchOnlyClient;
}
- public Http2SolrClient getOverseerOnlyClient() {
- return overseerOnlyClient;
- }
public PoolingHttpClientConnectionManager getDefaultConnectionManager() {
@@ -250,7 +238,7 @@ public class UpdateShardHandler implements SolrInfoBean {
if (updateOnlyClient != null) updateOnlyClient.disableCloseLock();
if (recoveryOnlyClient != null) recoveryOnlyClient.disableCloseLock();
if (searchOnlyClient != null) searchOnlyClient.disableCloseLock();
- if (overseerOnlyClient != null) overseerOnlyClient.disableCloseLock();
+
try (ParWork closer = new ParWork(this, true, true)) {
closer.collect("", () -> {
@@ -261,7 +249,6 @@ public class UpdateShardHandler implements SolrInfoBean {
closer.collect(recoveryOnlyClient);
closer.collect(searchOnlyClient);
closer.collect(updateOnlyClient);
- closer.collect(overseerOnlyClient);
closer.collect(defaultConnectionManager);
closer.collect("SolrInfoBean", () -> {
SolrInfoBean.super.close();
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
index 14460e0..e0299f0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyShardSplitTest.java
@@ -256,7 +256,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
"overseer"), new ConcurrentHashMap<>());
UpdateShardHandler updateShardHandler = new UpdateShardHandler(UpdateShardHandlerConfig.DEFAULT);
// TODO: close Overseer
- Overseer overseer = new Overseer((HttpShardHandler) new HttpShardHandlerFactory().getShardHandler(), updateShardHandler, "/admin/cores",
+ Overseer overseer = new Overseer(updateShardHandler, "/admin/cores",
reader, null, new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build());
overseer.close();
ElectionContext ec = new OverseerElectionContext(address.replaceAll("/", "_"), zkClient, overseer);
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java
index 6e098c9..249ec55 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteStatusTest.java
@@ -51,7 +51,7 @@ public class DeleteStatusTest extends SolrCloudTestCase {
if (state == RequestStatusState.COMPLETED)
break;
// assumeTrue("Error creating collection - skipping test", state != RequestStatusState.FAILED);
- TimeUnit.MILLISECONDS.sleep(10);
+ TimeUnit.MILLISECONDS.sleep(100);
}
//assumeTrue("Timed out creating collection - skipping test", state == RequestStatusState.COMPLETED);
return state;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index a100735..1d9ad27 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -737,7 +737,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
HttpShardHandlerFactory httpShardHandlerFactory = new HttpShardHandlerFactory();
httpShardHandlerFactory.init(new PluginInfo("shardHandlerFactory", Collections.emptyMap()));
httpShardHandlerFactorys.add(httpShardHandlerFactory);
- Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController,
+ Overseer overseer = new Overseer(updateShardHandler, "/admin/cores", reader, zkController,
new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(server.getZkAddress().replaceAll("/", "_"), zkClient, overseer);
@@ -1392,7 +1392,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkController zkController = createMockZkController(address, null, reader);
zkControllers.add(zkController);
- Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController,
+ Overseer overseer = new Overseer(updateShardHandler, "/admin/cores", reader, zkController,
new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(server.getZkAddress().replaceAll("/", "_"), zkClient, overseer);
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestShortCircuitedRequests.java b/solr/core/src/test/org/apache/solr/cloud/TestShortCircuitedRequests.java
index f7b497d..6a7130e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestShortCircuitedRequests.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestShortCircuitedRequests.java
@@ -34,7 +34,6 @@ public class TestShortCircuitedRequests extends AbstractFullDistribZkTestBase {
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
- assertEquals(4, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size());
index("id", "a!doc1"); // shard3
index("id", "b!doc1"); // shard1
index("id", "c!doc1"); // shard2
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 0bb1570..405083e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -40,6 +40,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -100,11 +101,11 @@ public class SolrZkClient implements Closeable {
// TODO: this is less efficient now, only using a single thread - allowing multiple threads leaves room for out of order cluster state updates
// TODO: we want to allow more parralel for sure, but make sure state updates per collection are serial
- final ExecutorService zkCallbackSerialExecutor = ParWork.getParExecutorService("zkCallbackExecutor", 1, 1, 10000, new BlockingArrayQueue());
+ final ExecutorService zkCallbackSerialExecutor = ParWork.getParExecutorService("zkCallbackExecutor", 1, 1, 0, new BlockingArrayQueue());
- final ExecutorService zkCallbackExecutor = ParWork.getParExecutorService("zkCallbackExecutor", 1, 12, 10000, new BlockingArrayQueue());
+ final ExecutorService zkCallbackExecutor = ParWork.getParExecutorService("zkCallbackExecutor", 1, 12, 0, new BlockingArrayQueue());
- final ExecutorService zkConnManagerCallbackExecutor = ParWork.getParExecutorService("zkConnManagerCallbackExecutor",1, 1, 10000, new BlockingArrayQueue());
+ final ExecutorService zkConnManagerCallbackExecutor = ParWork.getParExecutorService("zkConnManagerCallbackExecutor",1, 1, 0, new BlockingArrayQueue());
private volatile boolean isClosed = false;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 3437007..f7a011a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -838,6 +838,14 @@ public class ZkStateReader implements SolrCloseable {
} catch (NullPointerException e) {
// okay
}
+ if (notifications != null) {
+ try {
+ boolean success = notifications.awaitTermination(1, TimeUnit.SECONDS);
+ if (!success) notifications.shutdownNow();
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ }
+ }
} finally {
assert ObjectReleaseTracker.release(this);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
index f11d4b1..2a57431 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/SolrQueuedThreadPool.java
@@ -916,7 +916,7 @@ public class SolrQueuedThreadPool extends ContainerLifeCycle implements ThreadFa
this.closed = true;
- int threads = getThreads() * 3;
+ int threads = getThreads() * 10;
BlockingQueue<Runnable> jobs = (BlockingQueue<Runnable>) getQueue();
// Fill the job queue with noop jobs to wakeup idle threads.
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
index 50c8dfa..2d3ae14 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientTest.java
@@ -48,8 +48,6 @@ import org.junit.Test;
public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
- private static JettySolrRunner jetty;
-
/**
* Mock endpoint where the CUSS being tested in this class sends requests.
*/
@@ -134,7 +132,7 @@ public class ConcurrentUpdateSolrClientTest extends SolrJettyTestBase {
.withServlet(new ServletHolder(TestServlet.class), "/cuss/*")
.withSSLConfig(sslConfig.buildServerSSLConfig())
.build();
- jetty = createAndStartJetty(legacyExampleCollection1SolrHome(), jettyConfig);
+ createAndStartJetty(legacyExampleCollection1SolrHome(), jettyConfig);
}
@Test
diff --git a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
index 371ab30..9b0aef1 100644
--- a/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
+++ b/solr/test-framework/src/java/org/apache/solr/handler/component/TrackingShardHandlerFactory.java
@@ -25,6 +25,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
@@ -127,6 +128,48 @@ public class TrackingShardHandlerFactory extends HttpShardHandlerFactory {
}
@Override
+ public ShardHandler getShardHandler(LBHttp2SolrClient lbClient) {
+ final ShardHandlerFactory factory = this;
+ final ShardHandler wrapped = super.getShardHandler(lbClient);
+ return new HttpShardHandler((HttpShardHandlerFactory) factory) {
+ @Override
+ public void prepDistributed(ResponseBuilder rb) {
+ wrapped.prepDistributed(rb);
+ }
+
+ @Override
+ public void submit(ShardRequest sreq, String shard, ModifiableSolrParams params) {
+ synchronized (TrackingShardHandlerFactory.this) {
+ if (isTracking()) {
+ queue.offer(new ShardRequestAndParams(sreq, shard, params));
+ }
+ }
+ wrapped.submit(sreq, shard, params);
+ }
+
+ @Override
+ public ShardResponse takeCompletedIncludingErrors() {
+ return wrapped.takeCompletedIncludingErrors();
+ }
+
+ @Override
+ public ShardResponse takeCompletedOrError() {
+ return wrapped.takeCompletedOrError();
+ }
+
+ @Override
+ public void cancelAll() {
+ wrapped.cancelAll();
+ }
+
+ @Override
+ public ShardHandlerFactory getShardHandlerFactory() {
+ return factory;
+ }
+ };
+ }
+
+ @Override
public void close() {
super.close();
}