You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/11/29 18:19:23 UTC
[14/16] lucene-solr:master: SOLR-12801: Make massive improvements to
the tests.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index f1767ee..e5f6f2d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -46,7 +46,6 @@ import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.core.snapshots.SolrSnapshotManager;
import org.apache.solr.handler.admin.MetricsHistoryHandler;
import org.apache.solr.metrics.SolrMetricManager;
-import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,24 +126,26 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
// wait for a while until we don't see the collection
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- boolean removed = false;
- while (! timeout.hasTimedOut()) {
- timeout.sleep(100);
- removed = !zkStateReader.getClusterState().hasCollection(collection);
- if (removed) {
- timeout.sleep(500); // just a bit of time so it's more likely other
- // readers see on return
- break;
- }
- }
- if (!removed) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Could not fully remove collection: " + collection);
- }
+ zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null);
+
+// TimeOut timeout = new TimeOut(60, TimeUnit.SECONDS, timeSource);
+// boolean removed = false;
+// while (! timeout.hasTimedOut()) {
+// timeout.sleep(100);
+// removed = !zkStateReader.getClusterState().hasCollection(collection);
+// if (removed) {
+// timeout.sleep(500); // just a bit of time so it's more likely other
+// // readers see on return
+// break;
+// }
+// }
+// if (!removed) {
+// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+// "Could not fully remove collection: " + collection);
+// }
} finally {
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index 4dbc059..ec158bb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -218,7 +218,7 @@ public class DeleteReplicaCmd implements Cmd {
" with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
}
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
String asyncId = message.getStr(ASYNC);
AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
@@ -246,7 +246,7 @@ public class DeleteReplicaCmd implements Cmd {
ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
//check if the core unload removed the corenode zk entry
- if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
+ if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
}
// try and ensure core info is removed from cluster state
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 2ef2955..fa50c4a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -17,6 +17,13 @@
*/
package org.apache.solr.cloud.api.collections;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,12 +33,10 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -41,18 +46,10 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final OverseerCollectionMessageHandler ocmh;
@@ -85,13 +82,12 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
if (state == Slice.State.RECOVERY) {
// mark the slice as 'construction' and only then try to delete the cores
// see SOLR-9455
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(ocmh.zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(sliceId, Slice.State.CONSTRUCTION.toString());
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
}
String asyncId = message.getStr(ASYNC);
@@ -129,29 +125,14 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
log.debug("Waiting for delete shard action to complete");
- cleanupLatch.await(5, TimeUnit.MINUTES);
+ cleanupLatch.await(1, TimeUnit.MINUTES);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
ZkStateReader zkStateReader = ocmh.zkStateReader;
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
-
- // wait for a while until we don't see the shard
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- boolean removed = false;
- while (!timeout.hasTimedOut()) {
- timeout.sleep(100);
- DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
- removed = collection.getSlice(sliceId) == null;
- if (removed) {
- timeout.sleep(100); // just a bit of time so it's more likely other readers see on return
- break;
- }
- }
- if (!removed) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Could not fully remove collection: " + collectionName + " shard: " + sliceId);
- }
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+
+ zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (l, c) -> c.getSlice(sliceId) == null);
log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
} catch (SolrException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
index cf0a234..21d9cb0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -69,7 +69,7 @@ public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
String asyncId = message.getStr(ASYNC);
Map<String, String> requestMap = new HashMap<>();
NamedList shardRequestResults = new NamedList();
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index 59b7218..f22544a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -42,6 +42,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.update.SolrIndexSplitter;
@@ -146,7 +147,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ((HttpShardHandlerFactory)shardHandlerFactory).getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
// intersect source range, keyHashRange and target range
@@ -181,7 +182,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
"targetCollection", targetCollection.getName(),
"expireAt", RoutingRule.makeExpiryAt(timeout));
log.info("Adding routing rule: " + m);
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
// wait for a while until we see the new rule
log.info("Waiting to see routing rule updated in clusterstate");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index a724bc7..e67fc7f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -16,6 +16,58 @@
*/
package org.apache.solr.cloud.api.collections;
+import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
+import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
+import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ALIASPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.UTILIZENODE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -30,13 +82,12 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
@@ -79,8 +130,8 @@ import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.logging.MDCLoggingContext;
@@ -92,25 +143,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
-import static org.apache.solr.common.params.CollectionAdminParams.COLOCATED_WITH;
-import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTION;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
+import com.google.common.collect.ImmutableMap;
/**
* A {@link OverseerMessageHandler} that handles Collections API related
@@ -158,7 +191,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Overseer overseer;
- ShardHandlerFactory shardHandlerFactory;
+ HttpShardHandlerFactory shardHandlerFactory;
String adminPath;
ZkStateReader zkStateReader;
SolrCloudManager cloudManager;
@@ -191,7 +224,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private volatile boolean isClosed;
public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
- final ShardHandlerFactory shardHandlerFactory,
+ final HttpShardHandlerFactory shardHandlerFactory,
String adminPath,
Stats stats,
Overseer overseer,
@@ -334,7 +367,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
sreq.shards = new String[] {baseUrl};
sreq.actualShards = sreq.shards;
sreq.params = params;
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
shardHandler.submit(sreq, baseUrl, sreq.params);
}
@@ -343,24 +376,22 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+ overseer.offerStateUpdate(Utils.toJSON(m));
}
private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
SolrZkClient zkClient = zkStateReader.getZkClient();
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+ overseer.offerStateUpdate(Utils.toJSON(m));
}
private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
@@ -370,11 +401,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
"' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
}
SolrZkClient zkClient = zkStateReader.getZkClient();
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
- propMap.putAll(message.getProperties());
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+ Map<String, Object> m = new HashMap<>();
+ m.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
+ m.putAll(message.getProperties());
+ overseer.offerStateUpdate(Utils.toJSON(m));
}
/**
@@ -417,20 +447,21 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
- TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS, timeSource);
- while (! timeout.hasTimedOut()) {
- timeout.sleep(100);
- DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
- if (docCollection == null) { // someone already deleted the collection
- return true;
- }
- Slice slice = docCollection.getSlice(shard);
- if(slice == null || slice.getReplica(replicaName) == null) {
- return true;
- }
+ try {
+ zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (n, c) -> {
+ if (c == null)
+ return true;
+ Slice slice = c.getSlice(shard);
+ if(slice == null || slice.getReplica(replicaName) == null) {
+ return true;
+ }
+ return false;
+ });
+ } catch (TimeoutException e) {
+ return false;
}
- // replica still exists after the timeout
- return false;
+
+ return true;
}
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
@@ -441,7 +472,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.CORE_NODE_NAME_PROP, replicaName,
ZkStateReader.BASE_URL_PROP, replica.getStr(ZkStateReader.BASE_URL_PROP));
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+ overseer.offerStateUpdate(Utils.toJSON(m));
}
void checkRequired(ZkNodeProps message, String... props) {
@@ -475,7 +506,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
// Actually queue the migration command.
firstLoop = false;
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+ overseer.offerStateUpdate(Utils.toJSON(m));
}
timeout.sleep(100);
}
@@ -584,7 +615,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
- public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
+ public void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
String asyncId, Map<String, String> requestMap, String adminPath,
ZkStateReader zkStateReader) {
if (asyncId != null) {
@@ -640,7 +671,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
}
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+ overseer.offerStateUpdate(Utils.toJSON(message));
TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
boolean areChangesVisible = true;
@@ -680,8 +711,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
}
Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
+ assert coreNames.size() > 0;
Map<String, Replica> result = new HashMap<>();
- TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
while (true) {
DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
for (String coreName : coreNames) {
@@ -791,7 +823,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollection(collectionName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index d100ce0..a63b292 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -18,6 +18,20 @@
package org.apache.solr.cloud.api.collections;
+import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.ArrayList;
@@ -33,7 +47,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.OverseerAction;
@@ -60,20 +73,6 @@ import org.apache.solr.handler.component.ShardHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -89,7 +88,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
String restoreCollectionName = message.getStr(COLLECTION_PROP);
String backupName = message.getStr(NAME); // of backup
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
String asyncId = message.getStr(ASYNC);
String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
Map<String, String> requestMap = new HashMap<>();
@@ -209,8 +208,6 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-
//Mark all shards in CONSTRUCTION STATE while we restore the data
{
//TODO might instead createCollection accept an initial state? Is there a race?
@@ -220,7 +217,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
}
propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
}
// TODO how do we leverage the RULE / SNITCH logic in createCollection?
@@ -323,7 +320,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
for (Slice shard : restoreCollection.getSlices()) {
propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
}
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+ ocmh.overseer.offerStateUpdate((Utils.toJSON(new ZkNodeProps(propMap))));
}
if (totalReplicasPerShard > 1) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index aa4909d..24a52ea 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -30,7 +30,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
@@ -249,8 +248,8 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
propMap.put("shard_parent_node", nodeName);
propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
- inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
// wait until we are able to see the new shard in cluster state
ocmh.waitForNewShard(collectionName, subSlice);
@@ -281,7 +280,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
}
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseer.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient());
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
@@ -412,7 +411,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
ZkStateReader.NODE_NAME_PROP, subShardNodeName,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -446,7 +445,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
if (leaderZnodeStat == null || ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
// put sub-shards in recovery_failed state
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
for (String subSlice : subSlices) {
@@ -454,7 +453,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
if (leaderZnodeStat == null) {
// the leader is not live anymore, fail the split!
@@ -473,8 +472,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
if (repFactor == 1) {
// switch sub shard states to 'active'
- log.debug("Replication factor is 1 so switching shard states");
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+ log.info("Replication factor is 1 so switching shard states");
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(slice.get(), Slice.State.INACTIVE.toString());
@@ -483,10 +481,9 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
} else {
- log.debug("Requesting shard state be set to 'recovery'");
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+ log.info("Requesting shard state be set to 'recovery'");
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
for (String subSlice : subSlices) {
@@ -494,7 +491,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
}
t = timings.sub("createCoresForReplicas");
@@ -590,7 +587,6 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
// set already created sub shards states to CONSTRUCTION - this prevents them
// from entering into RECOVERY or ACTIVE (SOLR-9455)
- DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
final Map<String, Object> propMap = new HashMap<>();
boolean sendUpdateState = false;
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
@@ -618,7 +614,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
if (sendUpdateState) {
try {
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
} catch (Exception e) {
// don't give up yet - just log the error, we may still be able to clean up
log.warn("Cleanup failed after failed split of " + collectionName + "/" + parentShard + ": (slice state changes)", e);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index ddb4913..97e855c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
@@ -62,7 +63,7 @@ public class NodeLostTrigger extends TriggerBase {
public void init() throws Exception {
super.init();
lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
- log.debug("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
+ log.info("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
// pick up lost nodes for which marker paths were created
try {
List<String> lost = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
@@ -147,7 +148,7 @@ public class NodeLostTrigger extends TriggerBase {
}
Set<String> newLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
- log.debug("Running NodeLostTrigger: {} with currently live nodes: {}", name, newLiveNodes.size());
+ log.info("Running NodeLostTrigger: {} with currently live nodes: {} and last live nodes: {}", name, newLiveNodes.size(), lastLiveNodes.size());
// have any nodes that we were tracking been added to the cluster?
// if so, remove them from the tracking map
@@ -158,7 +159,7 @@ public class NodeLostTrigger extends TriggerBase {
Set<String> copyOfLastLiveNodes = new HashSet<>(lastLiveNodes);
copyOfLastLiveNodes.removeAll(newLiveNodes);
copyOfLastLiveNodes.forEach(n -> {
- log.debug("Tracking lost node: {}", n);
+ log.info("Tracking lost node: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
});
@@ -170,7 +171,8 @@ public class NodeLostTrigger extends TriggerBase {
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
long now = cloudManager.getTimeSource().getTimeNs();
- if (TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
+ long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
+ if (te >= getWaitForSecond()) {
nodeNames.add(nodeName);
times.add(timeRemoved);
}
@@ -197,6 +199,8 @@ public class NodeLostTrigger extends TriggerBase {
}
}
lastLiveNodes = new HashSet<>(newLiveNodes);
+ } catch (AlreadyClosedException e) {
+
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeLostTrigger", e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 052b4c4..6288e40 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -29,12 +29,12 @@ import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.IOUtils;
@@ -135,6 +135,8 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
log.debug("Adding .auto_add_replicas and .scheduled_maintenance triggers");
cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(updatedConfig), updatedConfig.getZkVersion());
break;
+ } catch (AlreadyClosedException e) {
+ break;
} catch (BadVersionException bve) {
// somebody else has changed the configuration so we must retry
} catch (InterruptedException e) {
@@ -178,7 +180,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
// must check for close here before we await on the condition otherwise we can only be woken up on interruption
if (isClosed) {
- log.warn("OverseerTriggerThread has been closed, exiting.");
+ log.info("OverseerTriggerThread has been closed, exiting.");
break;
}
@@ -190,7 +192,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
// are we closed?
if (isClosed) {
- log.warn("OverseerTriggerThread woken up but we are closed, exiting.");
+ log.info("OverseerTriggerThread woken up but we are closed, exiting.");
break;
}
@@ -211,7 +213,6 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
- log.warn("Interrupted", e);
break;
}
@@ -240,6 +241,8 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
}
try {
scheduledTriggers.add(entry.getValue());
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
log.warn("Exception initializing trigger " + entry.getKey() + ", configuration ignored", e);
}
@@ -275,6 +278,8 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
});
} catch (NoSuchElementException e) {
// ignore
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
log.warn("Error removing old nodeAdded markers", e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
index 5e25542..e5afd9f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
@@ -151,8 +151,8 @@ public class ScheduledTrigger extends TriggerBase {
public void run() {
synchronized (this) {
if (isClosed) {
- log.warn("ScheduledTrigger ran but was already closed");
- throw new RuntimeException("Trigger has been closed");
+ log.debug("ScheduledTrigger ran but was already closed");
+ return;
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 7c3cbb0..b9cd9f1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -42,7 +42,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
@@ -51,6 +50,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.RequestStatusResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
@@ -205,7 +205,7 @@ public class ScheduledTriggers implements Closeable {
try {
st = new TriggerWrapper(newTrigger, cloudManager, queueStats);
} catch (Exception e) {
- if (isClosed) {
+ if (isClosed || e instanceof AlreadyClosedException) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
}
if (cloudManager.isClosed()) {
@@ -559,7 +559,7 @@ public class ScheduledTriggers implements Closeable {
// fire a trigger only if an action is not pending
// note this is not fool proof e.g. it does not prevent an action being executed while a trigger
// is still executing. There is additional protection against that scenario in the event listener.
- if (!hasPendingActions.get()) {
+ if (!hasPendingActions.get()) {
// this synchronization is usually never under contention
// but the only reason to have it here is to ensure that when the set-properties API is used
// to change the schedule delay, we can safely cancel the old scheduled task
@@ -567,28 +567,37 @@ public class ScheduledTriggers implements Closeable {
// execution of the same trigger instance
synchronized (TriggerWrapper.this) {
// replay accumulated events on first run, if any
- if (replay) {
- TriggerEvent event;
- // peek first without removing - we may crash before calling the listener
- while ((event = queue.peekEvent()) != null) {
- // override REPLAYING=true
- event.getProperties().put(TriggerEvent.REPLAYING, true);
- if (! trigger.getProcessor().process(event)) {
- log.error("Failed to re-play event, discarding: " + event);
+
+ try {
+ if (replay) {
+ TriggerEvent event;
+ // peek first without removing - we may crash before calling the listener
+ while ((event = queue.peekEvent()) != null) {
+ // override REPLAYING=true
+ event.getProperties().put(TriggerEvent.REPLAYING, true);
+ if (!trigger.getProcessor().process(event)) {
+ log.error("Failed to re-play event, discarding: " + event);
+ }
+ queue.pollEvent(); // always remove it from queue
}
- queue.pollEvent(); // always remove it from queue
- }
- // now restore saved state to possibly generate new events from old state on the first run
- try {
- trigger.restoreState();
- } catch (Exception e) {
- // log but don't throw - see below
- log.error("Error restoring trigger state " + trigger.getName(), e);
+ // now restore saved state to possibly generate new events from old state on the first run
+ try {
+ trigger.restoreState();
+ } catch (Exception e) {
+ // log but don't throw - see below
+ log.error("Error restoring trigger state " + trigger.getName(), e);
+ }
+ replay = false;
}
- replay = false;
+ } catch (AlreadyClosedException e) {
+
+ } catch (Exception e) {
+ log.error("Unexpected exception from trigger: " + trigger.getName(), e);
}
try {
trigger.run();
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
// log but do not propagate exception because an exception thrown from a scheduled operation
// will suppress future executions
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
index 214552e..93fb353 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
@@ -239,7 +240,9 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
stateManager.createData(path, data, CreateMode.PERSISTENT);
}
lastState = state;
- } catch (InterruptedException | BadVersionException | AlreadyExistsException | IOException | KeeperException e) {
+ } catch (AlreadyExistsException e) {
+
+ } catch (InterruptedException | BadVersionException | IOException | KeeperException e) {
log.warn("Exception updating trigger state '" + path + "'", e);
}
}
@@ -253,6 +256,8 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
VersionedData versionedData = stateManager.getData(path);
data = versionedData.getData();
}
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
log.warn("Exception getting trigger state '" + path + "'", e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
index 057d792..e5c6f5b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.TimeSource;
@@ -78,7 +79,11 @@ public class TriggerEventQueue {
continue;
}
}
- } catch (Exception e) {
+ }
+ catch (AlreadyClosedException e) {
+
+ }
+ catch (Exception e) {
log.warn("Exception peeking queue of trigger " + triggerName, e);
}
return null;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/core/CloudConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CloudConfig.java b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
index 6248b45..15ccf3c 100644
--- a/solr/core/src/java/org/apache/solr/core/CloudConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
@@ -124,10 +124,10 @@ public class CloudConfig {
public static class CloudConfigBuilder {
- private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 15000;
+ private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 45000;
private static final int DEFAULT_LEADER_VOTE_WAIT = 180000; // 3 minutes
private static final int DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT = 180000;
- private static final int DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT = 30; // 30 seconds
+ private static final int DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT = 45; // 45 seconds
private static final boolean DEFAULT_CREATE_COLLECTION_CHECK_LEADER_ACTIVE = false;
private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION = 120000;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 83384fb..54f9114 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -16,6 +16,22 @@
*/
package org.apache.solr.core;
+import static java.util.Objects.requireNonNull;
+import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTOSCALING_HISTORY_PATH;
+import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.HEALTH_CHECK_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_HISTORY_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
+import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
+import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
@@ -35,10 +51,9 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.config.Lookup;
@@ -58,6 +73,7 @@ import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.DocCollection;
@@ -106,24 +122,13 @@ import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.stats.MetricUtils;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.Objects.requireNonNull;
-import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTOSCALING_HISTORY_PATH;
-import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.HEALTH_CHECK_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_HISTORY_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_STATUS_PATH;
-import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
-import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
/**
*
@@ -148,32 +153,32 @@ public class CoreContainer {
protected final Map<String, CoreLoadFailure> coreInitFailures = new ConcurrentHashMap<>();
- protected CoreAdminHandler coreAdminHandler = null;
- protected CollectionsHandler collectionsHandler = null;
- protected HealthCheckHandler healthCheckHandler = null;
+ protected volatile CoreAdminHandler coreAdminHandler = null;
+ protected volatile CollectionsHandler collectionsHandler = null;
+ protected volatile HealthCheckHandler healthCheckHandler = null;
- private InfoHandler infoHandler;
- protected ConfigSetsHandler configSetsHandler = null;
+ private volatile InfoHandler infoHandler;
+ protected volatile ConfigSetsHandler configSetsHandler = null;
- private PKIAuthenticationPlugin pkiAuthenticationPlugin;
+ private volatile PKIAuthenticationPlugin pkiAuthenticationPlugin;
- protected Properties containerProperties;
+ protected volatile Properties containerProperties;
- private ConfigSetService coreConfigService;
+ private volatile ConfigSetService coreConfigService;
- protected ZkContainer zkSys = new ZkContainer();
- protected ShardHandlerFactory shardHandlerFactory;
+ protected final ZkContainer zkSys = new ZkContainer();
+ protected volatile ShardHandlerFactory shardHandlerFactory;
- private UpdateShardHandler updateShardHandler;
+ private volatile UpdateShardHandler updateShardHandler;
- private ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
+ private volatile ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
new DefaultSolrThreadFactory("coreContainerWorkExecutor") );
private final OrderedExecutor replayUpdatesExecutor;
- protected LogWatcher logging = null;
+ protected volatile LogWatcher logging = null;
- private CloserThread backgroundCloser = null;
+ private volatile CloserThread backgroundCloser = null;
protected final NodeConfig cfg;
protected final SolrResourceLoader loader;
@@ -181,33 +186,33 @@ public class CoreContainer {
protected final CoresLocator coresLocator;
- private String hostName;
+ private volatile String hostName;
private final BlobRepository blobRepository = new BlobRepository(this);
- private PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
+ private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
- private boolean asyncSolrCoreLoad;
+ private volatile boolean asyncSolrCoreLoad;
- protected SecurityConfHandler securityConfHandler;
+ protected volatile SecurityConfHandler securityConfHandler;
- private SecurityPluginHolder<AuthorizationPlugin> authorizationPlugin;
+ private volatile SecurityPluginHolder<AuthorizationPlugin> authorizationPlugin;
- private SecurityPluginHolder<AuthenticationPlugin> authenticationPlugin;
+ private volatile SecurityPluginHolder<AuthenticationPlugin> authenticationPlugin;
- private BackupRepositoryFactory backupRepoFactory;
+ private volatile BackupRepositoryFactory backupRepoFactory;
- protected SolrMetricManager metricManager;
+ protected volatile SolrMetricManager metricManager;
- protected String metricTag = Integer.toHexString(hashCode());
+ protected volatile String metricTag = Integer.toHexString(hashCode());
protected MetricsHandler metricsHandler;
- protected MetricsHistoryHandler metricsHistoryHandler;
+ protected volatile MetricsHistoryHandler metricsHistoryHandler;
- protected MetricsCollectorHandler metricsCollectorHandler;
+ protected volatile MetricsCollectorHandler metricsCollectorHandler;
- protected AutoscalingHistoryHandler autoscalingHistoryHandler;
+ protected volatile AutoscalingHistoryHandler autoscalingHistoryHandler;
// Bits for the state variable.
@@ -216,7 +221,7 @@ public class CoreContainer {
public final static long INITIAL_CORE_LOAD_COMPLETE = 0x4L;
private volatile long status = 0L;
- protected AutoScalingHandler autoScalingHandler;
+ protected volatile AutoScalingHandler autoScalingHandler;
private enum CoreInitFailedAction { fromleader, none }
@@ -759,6 +764,7 @@ public class CoreContainer {
name = getZkController().getNodeName();
cloudManager = getZkController().getSolrCloudManager();
client = new CloudSolrClient.Builder(Collections.singletonList(getZkController().getZkServerAddress()), Optional.empty())
+ .withSocketTimeout(30000).withConnectionTimeout(15000)
.withHttpClient(updateShardHandler.getDefaultHttpClient()).build();
} else {
name = getNodeConfig().getNodeName();
@@ -818,53 +824,40 @@ public class CoreContainer {
return isShutDown;
}
- /**
- * Stops all cores.
- */
public void shutdown() {
log.info("Shutting down CoreContainer instance="
+ System.identityHashCode(this));
- isShutDown = true;
-
- ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
- replayUpdatesExecutor.shutdownAndAwaitTermination();
+ ForkJoinPool customThreadPool = new ForkJoinPool(6);
- if (metricsHistoryHandler != null) {
- IOUtils.closeQuietly(metricsHistoryHandler.getSolrClient());
- metricsHistoryHandler.close();
- }
+ isShutDown = true;
+ try {
+ if (isZooKeeperAware()) {
+ cancelCoreRecoveries();
- if (metricManager != null) {
- metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node));
- metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm));
- metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jetty));
+ if (isZooKeeperAware()) {
+ cancelCoreRecoveries();
+ try {
+ zkSys.zkController.removeEphemeralLiveNode();
+ } catch (AlreadyClosedException | SessionExpiredException | ConnectionLossException e) {
- metricManager.unregisterGauges(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node), metricTag);
- metricManager.unregisterGauges(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm), metricTag);
- metricManager.unregisterGauges(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jetty), metricTag);
- }
+ } catch (Exception e) {
+ log.warn("Error removing live node. Continuing to close CoreContainer", e);
+ }
+ }
- if (isZooKeeperAware()) {
- cancelCoreRecoveries();
- zkSys.zkController.publishNodeAsDown(zkSys.zkController.getNodeName());
- try {
- zkSys.zkController.removeEphemeralLiveNode();
- } catch (Exception e) {
- log.warn("Error removing live node. Continuing to close CoreContainer", e);
- }
- if (metricManager != null) {
- metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.cluster));
+ try {
+ if (zkSys.zkController.getZkClient().getConnectionManager().isConnected()) {
+ log.info("Publish this node as DOWN...");
+ zkSys.zkController.publishNodeAsDown(zkSys.zkController.getNodeName());
+ }
+ } catch (Exception e) {
+ log.warn("Error publishing nodes as down. Continuing to close CoreContainer", e);
+ }
}
- }
- try {
- if (coreAdminHandler != null) coreAdminHandler.shutdown();
- } catch (Exception e) {
- log.warn("Error shutting down CoreAdminHandler. Continuing to close CoreContainer.", e);
- }
-
- try {
+ ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
+
// First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
synchronized (solrCores.getModifyLock()) {
solrCores.getModifyLock().notifyAll(); // wake up anyone waiting
@@ -896,27 +889,77 @@ public class CoreContainer {
synchronized (solrCores.getModifyLock()) {
solrCores.getModifyLock().notifyAll(); // wake up the thread
}
+
+ customThreadPool.submit(() -> Collections.singleton(replayUpdatesExecutor).parallelStream().forEach(c -> {
+ c.shutdownAndAwaitTermination();
+ }));
+
+ if (metricsHistoryHandler != null) {
+ customThreadPool.submit(() -> Collections.singleton(metricsHistoryHandler).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+ customThreadPool.submit(() -> Collections.singleton(metricsHistoryHandler.getSolrClient()).parallelStream().forEach(c -> {
+ IOUtils.closeQuietly(c);
+ }));
+ }
+
+ if (metricManager != null) {
+ metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node));
+ metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm));
+ metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jetty));
+
+ metricManager.unregisterGauges(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node), metricTag);
+ metricManager.unregisterGauges(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jvm), metricTag);
+ metricManager.unregisterGauges(SolrMetricManager.getRegistryName(SolrInfoBean.Group.jetty), metricTag);
+ }
+
+ if (isZooKeeperAware()) {
+ cancelCoreRecoveries();
+
+ if (metricManager != null) {
+ metricManager.closeReporters(SolrMetricManager.getRegistryName(SolrInfoBean.Group.cluster));
+ }
+ }
+
+ try {
+ if (coreAdminHandler != null) {
+ customThreadPool.submit(() -> Collections.singleton(coreAdminHandler).parallelStream().forEach(c -> {
+ c.shutdown();
+ }));
+ }
+ } catch (Exception e) {
+ log.warn("Error shutting down CoreAdminHandler. Continuing to close CoreContainer.", e);
+ }
} finally {
try {
if (shardHandlerFactory != null) {
- shardHandlerFactory.close();
+ customThreadPool.submit(() -> Collections.singleton(shardHandlerFactory).parallelStream().forEach(c -> {
+ c.close();
+ }));
}
} finally {
try {
if (updateShardHandler != null) {
- updateShardHandler.close();
+ customThreadPool.submit(() -> Collections.singleton(shardHandlerFactory).parallelStream().forEach(c -> {
+ updateShardHandler.close();
+ }));
}
} finally {
- // we want to close zk stuff last
- zkSys.close();
+ try {
+ // we want to close zk stuff last
+ zkSys.close();
+ } finally {
+ ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
+ }
}
+
}
}
// It should be safe to close the authorization plugin at this point.
try {
- if(authorizationPlugin != null) {
+ if (authorizationPlugin != null) {
authorizationPlugin.plugin.close();
}
} catch (IOException e) {
@@ -925,7 +968,7 @@ public class CoreContainer {
// It should be safe to close the authentication plugin at this point.
try {
- if(authenticationPlugin != null) {
+ if (authenticationPlugin != null) {
authenticationPlugin.plugin.close();
authenticationPlugin = null;
}
@@ -1384,6 +1427,9 @@ public class CoreContainer {
* @param name the name of the SolrCore to reload
*/
public void reload(String name) {
+ if (isShutDown) {
+ throw new AlreadyClosedException();
+ }
SolrCore core = solrCores.getCoreFromAnyList(name, false);
if (core != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/core/SolrCore.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 6e13039..e66ca89 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -162,6 +162,7 @@ import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.PropertiesOutputStream;
import org.apache.solr.util.RefCounted;
+import org.apache.solr.util.TestInjection;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.apache.solr.util.plugin.SolrCoreAware;
@@ -764,10 +765,14 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
// Create the index if it doesn't exist.
if (!indexExists) {
log.debug("{}Solr index directory '{}' doesn't exist. Creating new index...", logid, indexDir);
-
- SolrIndexWriter writer = SolrIndexWriter.create(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(), true,
+ SolrIndexWriter writer = null;
+ try {
+ writer = SolrIndexWriter.create(this, "SolrCore.initIndex", indexDir, getDirectoryFactory(), true,
getLatestSchema(), solrConfig.indexConfig, solrDelPolicy, codec);
- writer.close();
+ } finally {
+ IOUtils.closeQuietly(writer);
+ }
+
}
cleanupOldIndexDirectories(reload);
@@ -992,6 +997,33 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
resourceLoader.inform(resourceLoader);
resourceLoader.inform(this); // last call before the latch is released.
this.updateHandler.informEventListeners(this);
+
+ infoRegistry.put("core", this);
+
+ // register any SolrInfoMBeans SolrResourceLoader initialized
+ //
+ // this must happen after the latch is released, because a JMX server impl may
+ // choose to block on registering until properties can be fetched from an MBean,
+ // and a SolrCoreAware MBean may have properties that depend on getting a Searcher
+ // from the core.
+ resourceLoader.inform(infoRegistry);
+
+ // Allow the directory factory to report metrics
+ if (directoryFactory instanceof SolrMetricProducer) {
+ ((SolrMetricProducer) directoryFactory).initializeMetrics(metricManager, coreMetricManager.getRegistryName(),
+ metricTag, "directoryFactory");
+ }
+
+ // seed version buckets with max from index during core initialization ... requires a searcher!
+ seedVersionBuckets();
+
+ bufferUpdatesIfConstructing(coreDescriptor);
+
+ this.ruleExpiryLock = new ReentrantLock();
+ this.snapshotDelLock = new ReentrantLock();
+
+ registerConfListener();
+
} catch (Throwable e) {
// release the latch, otherwise we block trying to do the close. This
// should be fine, since counting down on a latch of 0 is still fine
@@ -1016,31 +1048,6 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
// allow firstSearcher events to fire and make sure it is released
latch.countDown();
}
-
- infoRegistry.put("core", this);
-
- // register any SolrInfoMBeans SolrResourceLoader initialized
- //
- // this must happen after the latch is released, because a JMX server impl may
- // choose to block on registering until properties can be fetched from an MBean,
- // and a SolrCoreAware MBean may have properties that depend on getting a Searcher
- // from the core.
- resourceLoader.inform(infoRegistry);
-
- // Allow the directory factory to report metrics
- if (directoryFactory instanceof SolrMetricProducer) {
- ((SolrMetricProducer)directoryFactory).initializeMetrics(metricManager, coreMetricManager.getRegistryName(), metricTag, "directoryFactory");
- }
-
- // seed version buckets with max from index during core initialization ... requires a searcher!
- seedVersionBuckets();
-
- bufferUpdatesIfConstructing(coreDescriptor);
-
- this.ruleExpiryLock = new ReentrantLock();
- this.snapshotDelLock = new ReentrantLock();
-
- registerConfListener();
assert ObjectReleaseTracker.track(this);
}
@@ -1999,7 +2006,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
*/
public RefCounted<SolrIndexSearcher> openNewSearcher(boolean updateHandlerReopens, boolean realtime) {
if (isClosed()) { // catch some errors quicker
- throw new SolrException(ErrorCode.SERVER_ERROR, "openNewSearcher called on closed core");
+ throw new SolrCoreState.CoreIsClosedException();
}
SolrIndexSearcher tmp;
@@ -2372,7 +2379,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
return returnSearcher ? newSearchHolder : null;
} catch (Exception e) {
- if (e instanceof SolrException) throw (SolrException)e;
+ if (e instanceof RuntimeException) throw (RuntimeException)e;
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} finally {
@@ -2491,6 +2498,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
// even in the face of errors.
onDeckSearchers--;
searcherLock.notifyAll();
+ assert TestInjection.injectSearcherHooks(getCoreDescriptor() != null && getCoreDescriptor().getCloudDescriptor() != null ? getCoreDescriptor().getCloudDescriptor().getCollectionName() : null);
}
}
}
@@ -3008,7 +3016,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
int solrConfigversion, overlayVersion, managedSchemaVersion = 0;
SolrConfig cfg = null;
try (SolrCore solrCore = cc.solrCores.getCoreFromAnyList(coreName, true)) {
- if (solrCore == null || solrCore.isClosed()) return;
+ if (solrCore == null || solrCore.isClosed() || solrCore.getCoreContainer().isShutDown()) return;
cfg = solrCore.getSolrConfig();
solrConfigversion = solrCore.getSolrConfig().getOverlay().getZnodeVersion();
overlayVersion = solrCore.getSolrConfig().getZnodeVersion();
@@ -3042,7 +3050,7 @@ public final class SolrCore implements SolrInfoBean, SolrMetricProducer, Closeab
}
//some files in conf directory may have other than managedschema, overlay, params
try (SolrCore solrCore = cc.solrCores.getCoreFromAnyList(coreName, true)) {
- if (solrCore == null || solrCore.isClosed()) return;
+ if (solrCore == null || solrCore.isClosed() || cc.isShutDown()) return;
for (Runnable listener : solrCore.confListeners) {
try {
listener.run();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactory.java b/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactory.java
index b3b8cf0..7c83ec8 100644
--- a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactory.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
public abstract class TransientSolrCoreCacheFactory {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private CoreContainer coreContainer = null;
+ private volatile CoreContainer coreContainer = null;
public abstract TransientSolrCoreCache getTransientSolrCoreCache();
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactoryDefault.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactoryDefault.java b/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactoryDefault.java
index 722ab9c..0d56483 100644
--- a/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactoryDefault.java
+++ b/solr/core/src/java/org/apache/solr/core/TransientSolrCoreCacheFactoryDefault.java
@@ -18,7 +18,7 @@ package org.apache.solr.core;
public class TransientSolrCoreCacheFactoryDefault extends TransientSolrCoreCacheFactory {
- TransientSolrCoreCache transientSolrCoreCache = null;
+ volatile TransientSolrCoreCache transientSolrCoreCache = null;
@Override
public TransientSolrCoreCache getTransientSolrCoreCache() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/core/ZkContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 34e5764..ae9c54a 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -31,6 +31,7 @@ import java.util.function.Predicate;
import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
import org.apache.solr.cloud.SolrZkServer;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkConfigManager;
@@ -174,24 +175,31 @@ public class ZkContainer {
return zkRun.substring(0, zkRun.lastIndexOf('/'));
}
- public static Predicate<CoreDescriptor> testing_beforeRegisterInZk;
+ public static volatile Predicate<CoreDescriptor> testing_beforeRegisterInZk;
public void registerInZk(final SolrCore core, boolean background, boolean skipRecovery) {
+ CoreDescriptor cd = core.getCoreDescriptor(); // save this here - the core may not have it later
Runnable r = () -> {
MDCLoggingContext.setCore(core);
try {
try {
if (testing_beforeRegisterInZk != null) {
- testing_beforeRegisterInZk.test(core.getCoreDescriptor());
+ testing_beforeRegisterInZk.test(cd);
+ }
+ if (!core.getCoreContainer().isShutDown()) {
+ zkController.register(core.getName(), cd, skipRecovery);
}
- zkController.register(core.getName(), core.getCoreDescriptor(), skipRecovery);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
+ } catch (KeeperException e) {
+ SolrException.log(log, "", e);
+ } catch (AlreadyClosedException e) {
+
} catch (Exception e) {
try {
- zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
+ zkController.publish(cd, Replica.State.DOWN);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
log.error("", e1);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
index 8ec3c8b..fc5a048 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
@@ -97,6 +97,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
String targetCollection = params.get(CdcrParams.TARGET_COLLECTION_PARAM);
CloudSolrClient client = new Builder(Collections.singletonList(zkHost), Optional.empty())
+ .withSocketTimeout(30000).withConnectionTimeout(15000)
.sendUpdatesOnlyToShardLeaders()
.build();
client.setDefaultCollection(targetCollection);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 32e8651..b8a476b 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -222,7 +222,7 @@ public class IndexFetcher {
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
- return HttpClientUtil.createClient(httpClientParams, core.getCoreContainer().getUpdateShardHandler().getDefaultConnectionManager(), true);
+ return HttpClientUtil.createClient(httpClientParams, core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyConnectionManager(), true);
}
public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {