You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 11:56:55 UTC
[8/9] lucene-solr:master: SOLR-11285: Simulation framework for
autoscaling.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 8f65255..9732616 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -26,9 +26,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
import org.apache.solr.cloud.overseer.OverseerAction;
@@ -79,56 +81,26 @@ public class SplitShardCmd implements Cmd {
}
public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
- String collectionName = message.getStr("collection");
- String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+ String collectionName = message.getStr(CoreAdminParams.COLLECTION);
log.info("Split shard invoked");
ZkStateReader zkStateReader = ocmh.zkStateReader;
zkStateReader.forceUpdateCollection(collectionName);
+ AtomicReference<String> slice = new AtomicReference<>();
+ slice.set(message.getStr(ZkStateReader.SHARD_ID_PROP));
String splitKey = message.getStr("split.key");
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
-
DocCollection collection = clusterState.getCollection(collectionName);
- DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
- PolicyHelper.SessionWrapper sessionWrapper = null;
-
-
- Slice parentSlice;
- if (slice == null) {
- if (router instanceof CompositeIdRouter) {
- Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
- if (searchSlices.isEmpty()) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
- }
- if (searchSlices.size() > 1) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
- }
- parentSlice = searchSlices.iterator().next();
- slice = parentSlice.getName();
- log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
- + router.getClass().getName());
- }
- } else {
- parentSlice = collection.getSlice(slice);
- }
+ PolicyHelper.SessionWrapper sessionWrapper = null;
- if (parentSlice == null) {
- // no chance of the collection being null because ClusterState#getCollection(String) would have thrown
- // an exception already
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
- }
+ Slice parentSlice = getParentSlice(clusterState, collectionName, slice, splitKey);
// find the leader for the shard
Replica parentShardLeader = null;
try {
- parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
+ parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice.get(), 10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -140,80 +112,13 @@ public class SplitShardCmd implements Cmd {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
}
- DocRouter.Range range = parentSlice.getRange();
- if (range == null) {
- range = new PlainIdRouter().fullRange();
- }
+ List<DocRouter.Range> subRanges = new ArrayList<>();
+ List<String> subSlices = new ArrayList<>();
+ List<String> subShardNames = new ArrayList<>();
- List<DocRouter.Range> subRanges = null;
- String rangesStr = message.getStr(CoreAdminParams.RANGES);
- if (rangesStr != null) {
- String[] ranges = rangesStr.split(",");
- if (ranges.length == 0 || ranges.length == 1) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
- } else {
- subRanges = new ArrayList<>(ranges.length);
- for (int i = 0; i < ranges.length; i++) {
- String r = ranges[i];
- try {
- subRanges.add(DocRouter.DEFAULT.fromString(r));
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
- }
- if (!subRanges.get(i).isSubsetOf(range)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
- }
- }
- List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
- Collections.sort(temp);
- if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
- }
- for (int i = 1; i < temp.size(); i++) {
- if (temp.get(i - 1).max + 1 != temp.get(i).min) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
- + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
- }
- }
- }
- } else if (splitKey != null) {
- if (router instanceof CompositeIdRouter) {
- CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
- subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
- if (subRanges.size() == 1) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
- + " has a hash range that is exactly equal to hash range of shard: " + slice);
- }
- for (DocRouter.Range subRange : subRanges) {
- if (subRange.min == subRange.max) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
- }
- }
- log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
- rangesStr = "";
- for (int i = 0; i < subRanges.size(); i++) {
- DocRouter.Range subRange = subRanges.get(i);
- rangesStr += subRange.toString();
- if (i < subRanges.size() - 1) rangesStr += ',';
- }
- }
- } else {
- // todo: fixed to two partitions?
- subRanges = router.partitionRange(2, range);
- }
+ String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
try {
- List<String> subSlices = new ArrayList<>(subRanges.size());
- List<String> subShardNames = new ArrayList<>(subRanges.size());
- String nodeName = parentShardLeader.getNodeName();
- for (int i = 0; i < subRanges.size(); i++) {
- String subSlice = slice + "_" + i;
- subSlices.add(subSlice);
- String subShardName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), collection, subSlice, Replica.Type.NRT);
- subShardNames.add(subShardName);
- }
boolean oldShardsDeleted = false;
for (String subSlice : subSlices) {
@@ -252,6 +157,7 @@ public class SplitShardCmd implements Cmd {
final String asyncId = message.getStr(ASYNC);
Map<String, String> requestMap = new HashMap<>();
+ String nodeName = parentShardLeader.getNodeName();
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = subSlices.get(i);
@@ -300,6 +206,8 @@ public class SplitShardCmd implements Cmd {
ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
}
+ ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
for (String subShardName : subShardNames) {
@@ -388,7 +296,7 @@ public class SplitShardCmd implements Cmd {
// TODO: change this to handle sharding a slice into > 2 sub-shards.
- List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh,
+ List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
clusterState,
new ArrayList<>(clusterState.getLiveNodes()),
collectionName,
@@ -401,15 +309,15 @@ public class SplitShardCmd implements Cmd {
for (ReplicaPosition replicaPosition : replicaPositions) {
String sliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
- String shardName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index);
+ String solrCoreName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index);
- log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
+ log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
+ collectionName + " on " + subShardNodeName);
ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.SHARD_ID_PROP, sliceName,
- ZkStateReader.CORE_NAME_PROP, shardName,
+ ZkStateReader.CORE_NAME_PROP, solrCoreName,
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
ZkStateReader.NODE_NAME_PROP, subShardNodeName,
@@ -421,7 +329,7 @@ public class SplitShardCmd implements Cmd {
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, sliceName);
propMap.put("node", subShardNodeName);
- propMap.put(CoreAdminParams.NAME, shardName);
+ propMap.put(CoreAdminParams.NAME, solrCoreName);
// copy over property params:
for (String key : message.keySet()) {
if (key.startsWith(COLL_PROP_PREFIX)) {
@@ -478,7 +386,7 @@ public class SplitShardCmd implements Cmd {
DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- propMap.put(slice, Slice.State.INACTIVE.toString());
+ propMap.put(slice.get(), Slice.State.INACTIVE.toString());
for (String subSlice : subSlices) {
propMap.put(subSlice, Slice.State.ACTIVE.toString());
}
@@ -507,7 +415,7 @@ public class SplitShardCmd implements Cmd {
log.info("Successfully created all replica shards for all sub-slices " + subSlices);
- ocmh.commit(results, slice, parentShardLeader);
+ ocmh.commit(results, slice.get(), parentShardLeader);
return true;
} catch (SolrException e) {
@@ -519,4 +427,116 @@ public class SplitShardCmd implements Cmd {
if (sessionWrapper != null) sessionWrapper.release();
}
}
+
+ public static Slice getParentSlice(ClusterState clusterState, String collectionName, AtomicReference<String> slice, String splitKey) {
+ DocCollection collection = clusterState.getCollection(collectionName);
+ DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+
+ Slice parentSlice;
+
+ if (slice.get() == null) {
+ if (router instanceof CompositeIdRouter) {
+ Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
+ if (searchSlices.isEmpty()) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
+ }
+ if (searchSlices.size() > 1) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
+ }
+ parentSlice = searchSlices.iterator().next();
+ slice.set(parentSlice.getName());
+ log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
+ + router.getClass().getName());
+ }
+ } else {
+ parentSlice = collection.getSlice(slice.get());
+ }
+
+ if (parentSlice == null) {
+ // no chance of the collection being null because ClusterState#getCollection(String) would have thrown
+ // an exception already
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
+ }
+ return parentSlice;
+ }
+
+ public static String fillRanges(SolrCloudManager cloudManager, ZkNodeProps message, DocCollection collection, Slice parentSlice,
+ List<DocRouter.Range> subRanges, List<String> subSlices, List<String> subShardNames) {
+ String splitKey = message.getStr("split.key");
+ DocRouter.Range range = parentSlice.getRange();
+ if (range == null) {
+ range = new PlainIdRouter().fullRange();
+ }
+ DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+
+ String rangesStr = message.getStr(CoreAdminParams.RANGES);
+ if (rangesStr != null) {
+ String[] ranges = rangesStr.split(",");
+ if (ranges.length == 0 || ranges.length == 1) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
+ } else {
+ for (int i = 0; i < ranges.length; i++) {
+ String r = ranges[i];
+ try {
+ subRanges.add(DocRouter.DEFAULT.fromString(r));
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
+ }
+ if (!subRanges.get(i).isSubsetOf(range)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
+ }
+ }
+ List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
+ Collections.sort(temp);
+ if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
+ }
+ for (int i = 1; i < temp.size(); i++) {
+ if (temp.get(i - 1).max + 1 != temp.get(i).min) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
+ + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
+ }
+ }
+ }
+ } else if (splitKey != null) {
+ if (router instanceof CompositeIdRouter) {
+ CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
+ List<DocRouter.Range> tmpSubRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
+ if (tmpSubRanges.size() == 1) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
+ + " has a hash range that is exactly equal to hash range of shard: " + parentSlice.getName());
+ }
+ for (DocRouter.Range subRange : tmpSubRanges) {
+ if (subRange.min == subRange.max) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
+ }
+ }
+ subRanges.addAll(tmpSubRanges);
+ log.info("Partitioning parent shard " + parentSlice.getName() + " range: " + parentSlice.getRange() + " yields: " + subRanges);
+ rangesStr = "";
+ for (int i = 0; i < subRanges.size(); i++) {
+ DocRouter.Range subRange = subRanges.get(i);
+ rangesStr += subRange.toString();
+ if (i < subRanges.size() - 1) rangesStr += ',';
+ }
+ }
+ } else {
+ // todo: fixed to two partitions?
+ subRanges.addAll(router.partitionRange(2, range));
+ }
+
+ for (int i = 0; i < subRanges.size(); i++) {
+ String subSlice = parentSlice.getName() + "_" + i;
+ subSlices.add(subSlice);
+ String subShardName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), collection, subSlice, Replica.Type.NRT);
+ subShardNames.add(subShardName);
+ }
+ return rangesStr;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index e61536b..039067c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -133,13 +133,13 @@ public class AutoScaling {
*/
public static class TriggerFactoryImpl extends TriggerFactory {
- private final SolrCloudManager dataProvider;
+ private final SolrCloudManager cloudManager;
private final SolrResourceLoader loader;
- public TriggerFactoryImpl(SolrResourceLoader loader, SolrCloudManager dataProvider) {
- Objects.requireNonNull(dataProvider);
+ public TriggerFactoryImpl(SolrResourceLoader loader, SolrCloudManager cloudManager) {
+ Objects.requireNonNull(cloudManager);
Objects.requireNonNull(loader);
- this.dataProvider = dataProvider;
+ this.cloudManager = cloudManager;
this.loader = loader;
}
@@ -150,11 +150,11 @@ public class AutoScaling {
}
switch (type) {
case NODEADDED:
- return new NodeAddedTrigger(name, props, loader, dataProvider);
+ return new NodeAddedTrigger(name, props, loader, cloudManager);
case NODELOST:
- return new NodeLostTrigger(name, props, loader, dataProvider);
+ return new NodeLostTrigger(name, props, loader, cloudManager);
case SEARCHRATE:
- return new SearchRateTrigger(name, props, loader, dataProvider);
+ return new SearchRateTrigger(name, props, loader, cloudManager);
default:
throw new IllegalArgumentException("Unknown event type: " + type + " in trigger: " + name);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
index 7ca0d36..86fd04a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
@@ -36,26 +36,23 @@ import java.util.stream.Stream;
import org.apache.solr.api.Api;
import org.apache.solr.api.ApiBag;
-import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.Preference;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
-import org.apache.solr.cloud.ZkDistributedQueueFactory;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.util.CommandOperation;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.request.SolrQueryRequest;
@@ -63,7 +60,7 @@ import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
-import org.apache.solr.util.TimeSource;
+import org.apache.solr.common.util.TimeSource;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,15 +77,18 @@ import static org.apache.solr.common.params.CommonParams.JSON;
public class AutoScalingHandler extends RequestHandlerBase implements PermissionNameProvider {
public static final String HANDLER_PATH = "/admin/autoscaling";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected final CoreContainer container;
+ protected final SolrCloudManager cloudManager;
+ protected final SolrResourceLoader loader;
private final List<Map<String, String>> DEFAULT_ACTIONS = new ArrayList<>(3);
private static Set<String> singletonCommands = Stream.of("set-cluster-preferences", "set-cluster-policy")
.collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
- private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
+ private final TimeSource timeSource;
- public AutoScalingHandler(CoreContainer container) {
- this.container = container;
+ public AutoScalingHandler(SolrCloudManager cloudManager, SolrResourceLoader loader) {
+ this.cloudManager = cloudManager;
+ this.loader = loader;
+ this.timeSource = cloudManager.getTimeSource();
Map<String, String> map = new HashMap<>(2);
map.put(NAME, "compute_plan");
map.put(CLASS, "solr.ComputePlanAction");
@@ -116,7 +116,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown path: " + path);
}
- AutoScalingConfig autoScalingConf = container.getZkController().zkStateReader.getAutoScalingConfig();
+ AutoScalingConfig autoScalingConf = cloudManager.getDistribStateManager().getAutoScalingConfig();
if (parts.size() == 2) {
autoScalingConf.writeMap(new MapWriter.EntryWriter() {
@@ -154,21 +154,14 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
private void handleSuggestions(SolrQueryResponse rsp, AutoScalingConfig autoScalingConf) throws IOException {
- try (CloudSolrClient build = new CloudSolrClient.Builder()
- .withHttpClient(container.getUpdateShardHandler().getHttpClient())
- .withZkHost(container.getZkController().getZkServerAddress()).build()) {
- DistributedQueueFactory queueFactory = new ZkDistributedQueueFactory(container.getZkController().getZkClient());
- rsp.getValues().add("suggestions",
- PolicyHelper.getSuggestions(autoScalingConf, new SolrClientCloudManager(queueFactory, build)));
- }
-
-
+ rsp.getValues().add("suggestions",
+ PolicyHelper.getSuggestions(autoScalingConf, cloudManager));
}
public void processOps(SolrQueryRequest req, SolrQueryResponse rsp, List<CommandOperation> ops)
throws KeeperException, InterruptedException, IOException {
while (true) {
- AutoScalingConfig initialConfig = container.getZkController().zkStateReader.getAutoScalingConfig();
+ AutoScalingConfig initialConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
AutoScalingConfig currentConfig = initialConfig;
for (CommandOperation op : ops) {
switch (op.name) {
@@ -216,7 +209,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (!currentConfig.equals(initialConfig)) {
// update in ZK
- if (zkSetAutoScalingConfig(container.getZkController().getZkStateReader(), currentConfig)) {
+ if (setAutoScalingConfig(currentConfig)) {
break;
} else {
// someone else updated the config, get the latest one and re-apply our ops
@@ -244,12 +237,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
private void handleDiagnostics(SolrQueryResponse rsp, AutoScalingConfig autoScalingConf) throws IOException {
Policy policy = autoScalingConf.getPolicy();
- try (CloudSolrClient build = new CloudSolrClient.Builder()
- .withHttpClient(container.getUpdateShardHandler().getHttpClient())
- .withZkHost(container.getZkController().getZkServerAddress()).build()) {
- DistributedQueueFactory queueFactory = new ZkDistributedQueueFactory(container.getZkController().getZkClient());
- rsp.getValues().add("diagnostics", PolicyHelper.getDiagnostics(policy, new SolrClientCloudManager(queueFactory, build)));
- }
+ rsp.getValues().add("diagnostics", PolicyHelper.getDiagnostics(policy, cloudManager));
}
private AutoScalingConfig handleSetClusterPolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
@@ -302,7 +290,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
return currentConfig;
}
- container.getZkController().getZkStateReader().getClusterState().forEachCollection(coll -> {
+ cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> {
if (policyName.equals(coll.getPolicyName()))
op.addError(StrUtils.formatString("policy : {0} is being used by collection {1}", policyName, coll.getName()));
});
@@ -470,7 +458,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
// validate that we can load the listener class
// todo allow creation from blobstore
try {
- container.getResourceLoader().findClass(listenerClass, TriggerListener.class);
+ loader.findClass(listenerClass, TriggerListener.class);
} catch (Exception e) {
log.warn("error loading listener class ", e);
op.addError("Listener not found: " + listenerClass + ". error message:" + e.getMessage());
@@ -535,7 +523,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
String klass = action.get(CLASS);
try {
- container.getResourceLoader().findClass(klass, TriggerAction.class);
+ loader.findClass(klass, TriggerAction.class);
} catch (Exception e) {
log.warn("Could not load class : ", e);
op.addError("Action not found: " + klass + " " + e.getMessage());
@@ -632,26 +620,22 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
- private boolean zkSetAutoScalingConfig(ZkStateReader reader, AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ private boolean setAutoScalingConfig(AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
verifyAutoScalingConf(currentConfig);
try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(currentConfig), currentConfig.getZkVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
+ cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(currentConfig), currentConfig.getZkVersion());
+ } catch (BadVersionException bve) {
// somebody else has changed the configuration so we must retry
return false;
}
+ //log.debug("-- saved version " + currentConfig.getZkVersion() + ": " + currentConfig);
return true;
}
private void verifyAutoScalingConf(AutoScalingConfig autoScalingConf) throws IOException {
- try (CloudSolrClient build = new CloudSolrClient.Builder()
- .withHttpClient(container.getUpdateShardHandler().getHttpClient())
- .withZkHost(container.getZkController().getZkServerAddress()).build()) {
- DistributedQueueFactory queueFactory = new ZkDistributedQueueFactory(container.getZkController().getZkClient());
- Policy.Session session = autoScalingConf.getPolicy()
- .createSession(new SolrClientCloudManager(queueFactory, build));
- log.debug("Verified autoscaling configuration");
- }
+ Policy.Session session = autoScalingConf.getPolicy()
+ .createSession(cloudManager);
+ log.debug("Verified autoscaling configuration");
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index ccffea7..8cce976 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -33,8 +33,10 @@ import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,12 +59,13 @@ public class ComputePlanAction extends TriggerActionBase {
if (autoScalingConf.isEmpty()) {
throw new Exception("Action: " + getName() + " executed but no policy is configured");
}
-
- // Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
-// return new PolicyHelper.SessionWrapper(session, null);
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(cloudManager);
Policy.Session session = sessionWrapper.get();
-// Policy policy = autoScalingConf.getPolicy();
+ if (log.isTraceEnabled()) {
+ ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
+ log.trace("-- session: {}", session);
+ log.trace("-- state: {}", state);
+ }
try {
Suggester suggester = getSuggester(session, event, cloudManager);
while (true) {
@@ -99,12 +102,10 @@ public class ComputePlanAction extends TriggerActionBase {
case NODEADDED:
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Suggester.Hint.TARGET_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
- log.debug("NODEADDED Created suggester with targetNode: {}", event.getProperty(TriggerEvent.NODE_NAMES));
break;
case NODELOST:
suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Suggester.Hint.SRC_NODE, event.getProperty(TriggerEvent.NODE_NAMES));
- log.debug("NODELOST Created suggester with srcNode: {}", event.getProperty(TriggerEvent.NODE_NAMES));
break;
case SEARCHRATE:
Map<String, Map<String, Double>> hotShards = (Map<String, Map<String, Double>>)event.getProperty(AutoScalingParams.SHARD);
@@ -125,13 +126,10 @@ public class ComputePlanAction extends TriggerActionBase {
} else {
// collection || shard || replica -> ADDREPLICA
suggester = session.getSuggester(CollectionParams.CollectionAction.ADDREPLICA);
- Set<String> collections = new HashSet<>();
- // XXX improve this when AddReplicaSuggester supports coll_shard hint
- hotReplicas.forEach(r -> collections.add(r.getCollection()));
- hotShards.forEach((coll, shards) -> collections.add(coll));
- hotCollections.forEach((coll, rate) -> collections.add(coll));
- for (String coll : collections) {
- suggester = suggester.hint(Suggester.Hint.COLL, coll);
+ Set<Pair> collectionShards = new HashSet<>();
+ hotShards.forEach((coll, shards) -> shards.forEach((s, r) -> collectionShards.add(new Pair(coll, s))));
+ for (Pair<String, String> colShard : collectionShards) {
+ suggester = suggester.hint(Suggester.Hint.COLL_SHARD, colShard);
}
}
break;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index ebe0660..47b3440 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -56,7 +56,7 @@ public class ExecutePlanAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
- SolrCloudManager dataProvider = context.getCloudManager();
+ SolrCloudManager cloudManager = context.getCloudManager();
List<SolrRequest> operations = (List<SolrRequest>) context.getProperty("operations");
if (operations == null || operations.isEmpty()) {
log.info("No operations to execute for event: {}", event);
@@ -64,7 +64,7 @@ public class ExecutePlanAction extends TriggerActionBase {
}
try {
for (SolrRequest operation : operations) {
- log.info("Executing operation: {}", operation.getParams());
+ log.debug("Executing operation: {}", operation.getParams());
try {
SolrResponse response = null;
int counter = 0;
@@ -73,22 +73,22 @@ public class ExecutePlanAction extends TriggerActionBase {
// waitForFinalState so that the end effects of operations are visible
req.setWaitForFinalState(true);
String asyncId = event.getSource() + '/' + event.getId() + '/' + counter;
- String znode = saveAsyncId(dataProvider.getDistribStateManager(), event, asyncId);
+ String znode = saveAsyncId(cloudManager.getDistribStateManager(), event, asyncId);
log.debug("Saved requestId: {} in znode: {}", asyncId, znode);
// TODO: find a better way of using async calls using dataProvider API !!!
req.setAsyncId(asyncId);
- SolrResponse asyncResponse = dataProvider.request(req);
+ SolrResponse asyncResponse = cloudManager.request(req);
if (asyncResponse.getResponse().get("error") != null) {
throw new IOException("" + asyncResponse.getResponse().get("error"));
}
asyncId = (String)asyncResponse.getResponse().get("requestid");
- CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(dataProvider, asyncId,
+ CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, asyncId,
DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (statusResponse != null) {
RequestStatusState state = statusResponse.getRequestStatus();
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) {
try {
- dataProvider.getDistribStateManager().removeData(znode, -1);
+ cloudManager.getDistribStateManager().removeData(znode, -1);
} catch (Exception e) {
log.warn("Unexpected exception while trying to delete znode: " + znode, e);
}
@@ -96,7 +96,7 @@ public class ExecutePlanAction extends TriggerActionBase {
response = statusResponse;
}
} else {
- response = dataProvider.request(operation);
+ response = cloudManager.request(operation);
}
NamedList<Object> result = response.getResponse();
context.getProperties().compute("responses", (s, o) -> {
@@ -106,16 +106,15 @@ public class ExecutePlanAction extends TriggerActionBase {
return responses;
});
} catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to talk to ZooKeeper", e);
-// } catch (InterruptedException e) {
-// Thread.currentThread().interrupt();
-// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ExecutePlanAction was interrupted", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Unexpected exception executing operation: " + operation.getParams(), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ExecutePlanAction was interrupted", e);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception executing operation: " + operation.getParams(), e);
}
-
-// counter++;
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -124,22 +123,22 @@ public class ExecutePlanAction extends TriggerActionBase {
}
- static CollectionAdminRequest.RequestStatusResponse waitForTaskToFinish(SolrCloudManager dataProvider, String requestId, long duration, TimeUnit timeUnit) throws IOException, InterruptedException {
+ static CollectionAdminRequest.RequestStatusResponse waitForTaskToFinish(SolrCloudManager cloudManager, String requestId, long duration, TimeUnit timeUnit) throws IOException, InterruptedException {
long timeoutSeconds = timeUnit.toSeconds(duration);
RequestStatusState state = RequestStatusState.NOT_FOUND;
CollectionAdminRequest.RequestStatusResponse statusResponse = null;
for (int i = 0; i < timeoutSeconds; i++) {
try {
- statusResponse = (CollectionAdminRequest.RequestStatusResponse)dataProvider.request(CollectionAdminRequest.requestStatus(requestId));
+ statusResponse = (CollectionAdminRequest.RequestStatusResponse)cloudManager.request(CollectionAdminRequest.requestStatus(requestId));
state = statusResponse.getRequestStatus();
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) {
- log.info("Task with requestId={} finished with state={} in {}s", requestId, state, i * 5);
- dataProvider.request(CollectionAdminRequest.deleteAsyncId(requestId));
+ log.debug("Task with requestId={} finished with state={} in {}s", requestId, state, i * 5);
+ cloudManager.request(CollectionAdminRequest.deleteAsyncId(requestId));
return statusResponse;
} else if (state == RequestStatusState.NOT_FOUND) {
// the request for this id was never actually submitted! no harm done, just bail out
log.warn("Task with requestId={} was not found on overseer", requestId);
- dataProvider.request(CollectionAdminRequest.deleteAsyncId(requestId));
+ cloudManager.request(CollectionAdminRequest.deleteAsyncId(requestId));
return statusResponse;
}
} catch (Exception e) {
@@ -154,11 +153,12 @@ public class ExecutePlanAction extends TriggerActionBase {
throw e;
}
log.error("Unexpected Exception while querying status of requestId=" + requestId, e);
+ throw e;
}
if (i > 0 && i % 5 == 0) {
log.debug("Task with requestId={} still not complete after {}s. Last state={}", requestId, i * 5, state);
}
- TimeUnit.SECONDS.sleep(5);
+ cloudManager.getTimeSource().sleep(5000);
}
log.debug("Task with requestId={} did not complete within 5 minutes. Last state={}", requestId, state);
return statusResponse;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
index 2003cb8..0388472 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
@@ -66,8 +66,8 @@ public class HttpTriggerListener extends TriggerListenerBase {
private boolean followRedirects;
@Override
- public void init(SolrCloudManager dataProvider, AutoScalingConfig.TriggerListenerConfig config) {
- super.init(dataProvider, config);
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
urlTemplate = (String)config.properties.get("url");
payloadTemplate = (String)config.properties.get("payload");
contentType = (String)config.properties.get("contentType");
@@ -148,7 +148,7 @@ public class HttpTriggerListener extends TriggerListenerBase {
});
headers.put("Content-Type", type);
try {
- dataProvider.httpRequest(url, SolrRequest.METHOD.POST, headers, payload, timeout, followRedirects);
+ cloudManager.httpRequest(url, SolrRequest.METHOD.POST, headers, payload, timeout, followRedirects);
} catch (IOException e) {
LOG.warn("Exception sending request for event " + event, e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 2ef1b1d..a1e19e9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -35,7 +35,6 @@ import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,8 +44,6 @@ import org.slf4j.LoggerFactory;
public class NodeAddedTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final TimeSource timeSource;
-
private Set<String> lastLiveNodes;
private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
@@ -55,7 +52,6 @@ public class NodeAddedTrigger extends TriggerBase {
SolrResourceLoader loader,
SolrCloudManager cloudManager) {
super(TriggerEventType.NODEADDED, name, properties, loader, cloudManager);
- this.timeSource = TimeSource.CURRENT_TIME;
lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
@@ -71,7 +67,7 @@ public class NodeAddedTrigger extends TriggerBase {
// don't add nodes that have since gone away
if (lastLiveNodes.contains(n)) {
log.debug("Adding node from marker path: {}", n);
- nodeNameVsTimeAdded.put(n, timeSource.getTime());
+ nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getTime());
}
removeMarker(n);
});
@@ -131,7 +127,7 @@ public class NodeAddedTrigger extends TriggerBase {
log.debug("Running NodeAddedTrigger {}", name);
Set<String> newLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
- log.debug("Found livenodes: {}", newLiveNodes);
+ log.debug("Found livenodes: {}", newLiveNodes.size());
// have any nodes that we were tracking been removed from the cluster?
// if so, remove them from the tracking map
@@ -142,7 +138,7 @@ public class NodeAddedTrigger extends TriggerBase {
Set<String> copyOfNew = new HashSet<>(newLiveNodes);
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
- long eventTime = timeSource.getTime();
+ long eventTime = cloudManager.getTimeSource().getTime();
log.debug("Tracking new node: {} at time {}", n, eventTime);
nodeNameVsTimeAdded.put(n, eventTime);
});
@@ -154,7 +150,7 @@ public class NodeAddedTrigger extends TriggerBase {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
- long now = timeSource.getTime();
+ long now = cloudManager.getTimeSource().getTime();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
nodeNames.add(nodeName);
times.add(timeAdded);
@@ -163,7 +159,8 @@ public class NodeAddedTrigger extends TriggerBase {
AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (!nodeNames.isEmpty()) {
if (processor != null) {
- log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name, nodeNames, times, timeSource.getTime());
+ log.debug("NodeAddedTrigger {} firing registered processor for nodes: {} added at times {}, now={}", name,
+ nodeNames, times, cloudManager.getTimeSource().getTime());
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames))) {
// remove from tracking set only if the fire was accepted
nodeNames.forEach(n -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index d53a354..57c76c0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -34,7 +34,6 @@ import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +43,6 @@ import org.slf4j.LoggerFactory;
public class NodeLostTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final TimeSource timeSource;
-
private Set<String> lastLiveNodes;
private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
@@ -54,7 +51,6 @@ public class NodeLostTrigger extends TriggerBase {
SolrResourceLoader loader,
SolrCloudManager dataProvider) {
super(TriggerEventType.NODELOST, name, properties, loader, dataProvider);
- this.timeSource = TimeSource.CURRENT_TIME;
lastLiveNodes = new HashSet<>(dataProvider.getClusterStateProvider().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
}
@@ -69,7 +65,7 @@ public class NodeLostTrigger extends TriggerBase {
// don't add nodes that have since came back
if (!lastLiveNodes.contains(n)) {
log.debug("Adding lost node from marker path: {}", n);
- nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+ nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTime());
}
removeMarker(n);
});
@@ -122,12 +118,12 @@ public class NodeLostTrigger extends TriggerBase {
synchronized (this) {
if (isClosed) {
log.warn("NodeLostTrigger ran but was already closed");
- throw new RuntimeException("Trigger has been closed");
+ return;
}
}
Set<String> newLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
- log.debug("Running NodeLostTrigger: {} with currently live nodes: {}", name, newLiveNodes);
+ log.debug("Running NodeLostTrigger: {} with currently live nodes: {}", name, newLiveNodes.size());
// have any nodes that we were tracking been added to the cluster?
// if so, remove them from the tracking map
@@ -139,7 +135,7 @@ public class NodeLostTrigger extends TriggerBase {
copyOfLastLiveNodes.removeAll(newLiveNodes);
copyOfLastLiveNodes.forEach(n -> {
log.debug("Tracking lost node: {}", n);
- nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+ nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTime());
});
// has enough time expired to trigger events for a node?
@@ -149,7 +145,7 @@ public class NodeLostTrigger extends TriggerBase {
Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
- long now = timeSource.getTime();
+ long now = cloudManager.getTimeSource().getTime();
if (TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
nodeNames.add(nodeName);
times.add(timeRemoved);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 3171404..7a9390b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -57,7 +57,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final SolrCloudManager dataProvider;
+ private final SolrCloudManager cloudManager;
private final CloudConfig cloudConfig;
@@ -80,11 +80,11 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
private AutoScalingConfig autoScalingConfig;
- public OverseerTriggerThread(SolrResourceLoader loader, SolrCloudManager dataProvider, CloudConfig cloudConfig) {
- this.dataProvider = dataProvider;
+ public OverseerTriggerThread(SolrResourceLoader loader, SolrCloudManager cloudManager, CloudConfig cloudConfig) {
+ this.cloudManager = cloudManager;
this.cloudConfig = cloudConfig;
- scheduledTriggers = new ScheduledTriggers(loader, dataProvider);
- triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, dataProvider);
+ scheduledTriggers = new ScheduledTriggers(loader, cloudManager);
+ triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, cloudManager);
}
@Override
@@ -114,11 +114,11 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
// we automatically add a trigger for auto add replicas if it does not exists already
while (!isClosed) {
try {
- AutoScalingConfig autoScalingConfig = dataProvider.getDistribStateManager().getAutoScalingConfig();
+ AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
AutoScalingConfig withAutoAddReplicasTrigger = withAutoAddReplicasTrigger(autoScalingConfig);
if (withAutoAddReplicasTrigger.equals(autoScalingConfig)) break;
log.debug("Adding .autoAddReplicas trigger");
- dataProvider.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(withAutoAddReplicasTrigger), withAutoAddReplicasTrigger.getZkVersion());
+ cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(withAutoAddReplicasTrigger), withAutoAddReplicasTrigger.getZkVersion());
break;
} catch (BadVersionException bve) {
// somebody else has changed the configuration so we must retry
@@ -225,7 +225,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
}
}
- DistribStateManager stateManager = dataProvider.getDistribStateManager();
+ DistribStateManager stateManager = cloudManager.getDistribStateManager();
if (cleanOldNodeLostMarkers) {
log.debug("-- clean old nodeLost markers");
try {
@@ -259,7 +259,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
private void removeNodeMarker(String path, String nodeName) {
path = path + "/" + nodeName;
try {
- dataProvider.getDistribStateManager().removeData(path, -1);
+ cloudManager.getDistribStateManager().removeData(path, -1);
log.debug(" -- deleted " + path);
} catch (NoSuchElementException e) {
// ignore
@@ -297,7 +297,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
if (isClosed) {
return;
}
- AutoScalingConfig currentConfig = dataProvider.getDistribStateManager().getAutoScalingConfig(watcher);
+ AutoScalingConfig currentConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(watcher);
log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion());
if (znodeVersion >= currentConfig.getZkVersion()) {
// protect against reordered watcher fires by ensuring that we only move forward
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 8278977..8ebdf1a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -38,12 +38,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
@@ -78,13 +78,14 @@ import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_SCHEDULE_D
*/
public class ScheduledTriggers implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
- static final int DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS = 5;
- static final int DEFAULT_COOLDOWN_PERIOD_SECONDS = 5;
- static final int DEFAULT_TRIGGER_CORE_POOL_SIZE = 4;
+ public static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
+ public static final int DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS = 5;
+ public static final int DEFAULT_COOLDOWN_PERIOD_SECONDS = 5;
+ public static final int DEFAULT_TRIGGER_CORE_POOL_SIZE = 4;
static final Map<String, Object> DEFAULT_PROPERTIES = new HashMap<>();
+ // Note: values must be all in milliseconds!
static {
DEFAULT_PROPERTIES.put(TRIGGER_SCHEDULE_DELAY_SECONDS, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
DEFAULT_PROPERTIES.put(TRIGGER_COOLDOWN_PERIOD_SECONDS, DEFAULT_COOLDOWN_PERIOD_SECONDS);
@@ -114,7 +115,7 @@ public class ScheduledTriggers implements Closeable {
private final AtomicLong cooldownPeriod = new AtomicLong(TimeUnit.SECONDS.toNanos(DEFAULT_COOLDOWN_PERIOD_SECONDS));
- private final AtomicInteger triggerDelay = new AtomicInteger(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
+ private final AtomicLong triggerDelay = new AtomicLong(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
private final AtomicReference<ActionThrottle> actionThrottle;
@@ -136,14 +137,14 @@ public class ScheduledTriggers implements Closeable {
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
- actionThrottle = new AtomicReference<>(new ActionThrottle("action", TimeUnit.SECONDS.toMillis(DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS)));
+ actionThrottle = new AtomicReference<>(new ActionThrottle("action", TimeUnit.SECONDS.toMillis(DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS), cloudManager.getTimeSource()));
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
this.loader = loader;
queueStats = new Stats();
listeners = new TriggerListeners();
// initialize cooldown timer
- cooldownStart.set(System.nanoTime() - cooldownPeriod.get());
+ cooldownStart.set(cloudManager.getTimeSource().getTime() - cooldownPeriod.get());
}
/**
@@ -168,7 +169,9 @@ public class ScheduledTriggers implements Closeable {
scheduledTriggers.forEach((s, scheduledTrigger) -> {
if (scheduledTrigger.scheduledFuture.cancel(false)) {
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(
- scheduledTrigger, 0, triggerDelay.get(), TimeUnit.SECONDS);
+ scheduledTrigger, 0,
+ cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
} else {
log.debug("Failed to cancel scheduled task: {}", s);
}
@@ -188,9 +191,10 @@ public class ScheduledTriggers implements Closeable {
if (oldThrottle.getLastActionStartedAt() != null) {
newThrottle = new ActionThrottle("action",
minMsBetweenActions,
- oldThrottle.getLastActionStartedAt());
+ oldThrottle.getLastActionStartedAt(),
+ cloudManager.getTimeSource());
} else {
- newThrottle = new ActionThrottle("action", minMsBetweenActions);
+ newThrottle = new ActionThrottle("action", minMsBetweenActions, cloudManager.getTimeSource());
}
this.actionThrottle.set(newThrottle);
break;
@@ -200,12 +204,17 @@ public class ScheduledTriggers implements Closeable {
this.autoScalingConfig = autoScalingConfig;
// reset cooldown and actionThrottle
- cooldownStart.set(System.nanoTime() - cooldownPeriod.get());
+ cooldownStart.set(cloudManager.getTimeSource().getTime() - cooldownPeriod.get());
actionThrottle.get().reset();
listeners.setAutoScalingConfig(autoScalingConfig);
}
+ @VisibleForTesting
+ void resetActionThrottle() {
+ actionThrottle.get().reset();
+ }
+
/**
* Adds a new trigger or replaces an existing one. The replaced trigger, if any, is closed
* <b>before</b> the new trigger is run. If a trigger is replaced with itself then this
@@ -261,7 +270,7 @@ public class ScheduledTriggers implements Closeable {
}
boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
AutoScaling.Trigger source = scheduledSource.trigger;
- if (source.isClosed()) {
+ if (scheduledSource.isClosed || source.isClosed()) {
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
log.warn(msg);
@@ -269,7 +278,7 @@ public class ScheduledTriggers implements Closeable {
return false;
}
// reject events during cooldown period
- if (cooldownStart.get() + cooldownPeriod.get() > System.nanoTime()) {
+ if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
log.debug("-------- Cooldown period - rejecting event: " + event);
event.getProperties().put(TriggerEvent.COOLDOWN, true);
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "In cooldown period.");
@@ -288,8 +297,16 @@ public class ScheduledTriggers implements Closeable {
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
List<TriggerAction> actions = source.getActions();
if (actions != null) {
+ if (actionExecutor.isShutdown()) {
+ String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the executor has already been closed", event.toString(), source);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
+ log.warn(msg);
+ // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
+ return false;
+ }
actionExecutor.submit(() -> {
assert hasPendingActions.get();
+ long eventProcessingStart = cloudManager.getTimeSource().getTime();
log.debug("-- processing actions for " + event);
try {
// let the action executor thread wait instead of the trigger thread so we use the throttle here
@@ -324,9 +341,11 @@ public class ScheduledTriggers implements Closeable {
} catch (Exception e) {
log.warn("Exception executing actions", e);
} finally {
- cooldownStart.set(System.nanoTime());
+ cooldownStart.set(cloudManager.getTimeSource().getTime());
hasPendingActions.set(false);
}
+ log.debug("-- processing took {} ms for event id={}",
+ TimeUnit.NANOSECONDS.toMillis(cloudManager.getTimeSource().getTime() - eventProcessingStart), event.id);
});
} else {
if (enqueued) {
@@ -347,7 +366,9 @@ public class ScheduledTriggers implements Closeable {
}
});
newTrigger.init(); // mark as ready for scheduling
- scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, triggerDelay.get(), TimeUnit.SECONDS);
+ scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0,
+ cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS),
+ TimeUnit.MILLISECONDS);
}
private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
index 0c6ffd4..ec3110e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
@@ -37,7 +37,6 @@ import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.metrics.SolrCoreMetricManager;
-import org.apache.solr.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,6 @@ import org.slf4j.LoggerFactory;
public class SearchRateTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final TimeSource timeSource;
private final String handler;
private final String collection;
private final String shard;
@@ -63,7 +61,6 @@ public class SearchRateTrigger extends TriggerBase {
SolrResourceLoader loader,
SolrCloudManager cloudManager) {
super(TriggerEventType.SEARCHRATE, name, properties, loader, cloudManager);
- this.timeSource = TimeSource.CURRENT_TIME;
this.state.put("lastCollectionEvent", lastCollectionEvent);
this.state.put("lastNodeEvent", lastNodeEvent);
this.state.put("lastShardEvent", lastShardEvent);
@@ -168,6 +165,9 @@ public class SearchRateTrigger extends TriggerBase {
});
});
});
+ if (metricTags.isEmpty()) {
+ continue;
+ }
Map<String, Object> rates = cloudManager.getNodeStateProvider().getNodeValues(node, metricTags.keySet());
rates.forEach((tag, rate) -> {
ReplicaInfo info = metricTags.get(tag);
@@ -184,7 +184,7 @@ public class SearchRateTrigger extends TriggerBase {
});
}
- long now = timeSource.getTime();
+ long now = cloudManager.getTimeSource().getTime();
// check for exceeded rates and filter out those with less than waitFor from previous events
Map<String, Double> hotNodes = nodeRates.entrySet().stream()
.filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
@@ -274,7 +274,7 @@ public class SearchRateTrigger extends TriggerBase {
private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
- log.debug("name=" + name + ", lastTime=" + lastTime + ", elapsed=" + elapsed);
+ log.trace("name={}, lastTime={}, elapsed={}", name, lastTime, elapsed);
if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
index 34761f2..3282075 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
@@ -72,8 +72,8 @@ public class SystemLogListener extends TriggerListenerBase {
private boolean enabled = true;
@Override
- public void init(SolrCloudManager dataProvider, AutoScalingConfig.TriggerListenerConfig config) {
- super.init(dataProvider, config);
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
collection = (String)config.properties.getOrDefault(CollectionAdminParams.COLLECTION, CollectionAdminParams.SYSTEM_COLL);
enabled = Boolean.parseBoolean(String.valueOf(config.properties.getOrDefault("enabled", true)));
}
@@ -119,7 +119,7 @@ public class SystemLogListener extends TriggerListenerBase {
UpdateRequest req = new UpdateRequest();
req.add(doc);
req.setParam(CollectionAdminParams.COLLECTION, collection);
- dataProvider.request(req);
+ cloudManager.request(req);
} catch (Exception e) {
if ((e instanceof SolrException) && e.getMessage().contains("Collection not found")) {
// relatively benign
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
index f40a49f..311c735 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -27,7 +27,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.util.TimeSource;
+import org.apache.solr.common.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ public class TriggerEventQueue {
// TODO: collect stats
this.delegate = cloudManager.getDistributedQueueFactory().makeQueue(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName);
this.triggerName = triggerName;
- this.timeSource = TimeSource.CURRENT_TIME;
+ this.timeSource = cloudManager.getTimeSource();
}
public boolean offerEvent(TriggerEvent event) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
index 507c77d..61a95db 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
@@ -27,11 +27,11 @@ import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
public abstract class TriggerListenerBase implements TriggerListener {
protected AutoScalingConfig.TriggerListenerConfig config;
- protected SolrCloudManager dataProvider;
+ protected SolrCloudManager cloudManager;
@Override
- public void init(SolrCloudManager dataProvider, AutoScalingConfig.TriggerListenerConfig config) {
- this.dataProvider = dataProvider;
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ this.cloudManager = cloudManager;
this.config = config;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index c14f180..951c752 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -43,12 +43,12 @@ import static org.apache.solr.common.params.CommonParams.NAME;
public class CollectionMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected final SolrCloudManager dataProvider;
+ protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
- public CollectionMutator(SolrCloudManager dataProvider) {
- this.dataProvider = dataProvider;
- this.stateManager = dataProvider.getDistribStateManager();
+ public CollectionMutator(SolrCloudManager cloudManager) {
+ this.cloudManager = cloudManager;
+ this.stateManager = cloudManager.getDistribStateManager();
}
public ZkWriteCommand createShard(final ClusterState clusterState, ZkNodeProps message) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 035de68..dbcdd3d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -245,7 +245,7 @@ public class ReplicaMutator {
log.debug("node=" + coreNodeName + " is already registered");
} else {
// if coreNodeName is null, auto assign one
- coreNodeName = Assign.assignNode(stateManager, collection);
+ coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
}
message.getProperties().put(ZkStateReader.CORE_NODE_NAME_PROP,
coreNodeName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 6e820b0..6718a80 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -73,7 +73,7 @@ public class SliceMutator {
if (message.getStr(ZkStateReader.CORE_NODE_NAME_PROP) != null) {
coreNodeName = message.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
} else {
- coreNodeName = Assign.assignNode(stateManager, collection);
+ coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
}
Replica replica = new Replica(coreNodeName,
makeMap(
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 5ed72d7..eb13775 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -540,8 +540,6 @@ public class CoreContainer {
// may want to add some configuration here in the future
metricsCollectorHandler.init(null);
- autoScalingHandler = createHandler(AutoScalingHandler.HANDLER_PATH, AutoScalingHandler.class.getName(), AutoScalingHandler.class);
-
containerHandlers.put(AUTHZ_PATH, securityConfHandler);
securityConfHandler.initializeMetrics(metricManager, SolrInfoBean.Group.node.toString(), AUTHZ_PATH);
containerHandlers.put(AUTHC_PATH, securityConfHandler);
@@ -690,6 +688,10 @@ public class CoreContainer {
if (isZooKeeperAware()) {
zkSys.getZkController().checkOverseerDesignate();
+ // initialize this handler here when SolrCloudManager is ready
+ autoScalingHandler = new AutoScalingHandler(getZkController().getSolrCloudManager(), loader);
+ containerHandlers.put(AutoScalingHandler.HANDLER_PATH, autoScalingHandler);
+ autoScalingHandler.initializeMetrics(metricManager, SolrInfoBean.Group.node.toString(), AutoScalingHandler.HANDLER_PATH);
}
// This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
index 6ef5ebd..d8f3d52 100644
--- a/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
+++ b/solr/core/src/java/org/apache/solr/handler/CdcrReplicatorManager.java
@@ -45,6 +45,7 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.util.TimeOut;
@@ -273,7 +274,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
- TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ TimeOut timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
if (closed) {
log.warn("Cancelling waiting for bootstrap on target: {} shard: {} to complete", targetCollection, shard);
@@ -285,7 +286,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
try {
log.info("CDCR bootstrap running for {} seconds, sleeping for {} ms",
BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS), BOOTSTRAP_RETRY_DELAY_MS);
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+ timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -309,7 +310,7 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
while (!closed && sendBootstrapCommand() != BootstrapStatus.SUBMITTED) {
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
- timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS); // reset the timer
+ timeOut = new TimeOut(BOOTSTRAP_TIMEOUT_SECONDS, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
retries++;
}
} else if (status == BootstrapStatus.NOTFOUND || status == BootstrapStatus.CANCELLED) {
@@ -321,13 +322,13 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
retries = 1;
- timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS); // reset the timer
+ timeOut = new TimeOut(6L * 3600L * 3600L, TimeUnit.SECONDS, TimeSource.NANO_TIME); // reset the timer
} else if (status == BootstrapStatus.UNKNOWN || status == BootstrapStatus.SUBMITTED) {
log.info("CDCR bootstrap is " + (status == BootstrapStatus.UNKNOWN ? "unknown" : "submitted"),
BOOTSTRAP_TIMEOUT_SECONDS - timeOut.timeLeft(TimeUnit.SECONDS));
// we were not able to query the status on the remote end
// so just sleep for a bit and try again
- Thread.sleep(BOOTSTRAP_RETRY_DELAY_MS);
+ timeOut.sleep(BOOTSTRAP_RETRY_DELAY_MS);
}
}
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index de066d5..d339f27 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -391,7 +391,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
results.add("status", status);
}
- enum CollectionOperation implements CollectionOp {
+ public enum CollectionOperation implements CollectionOp {
/**
* very simple currently, you can pass a template collection, and the new collection is created on
* every node the template collection is on
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/schema/SchemaManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/schema/SchemaManager.java b/solr/core/src/java/org/apache/solr/schema/SchemaManager.java
index d75fccd..217c726 100644
--- a/solr/core/src/java/org/apache/solr/schema/SchemaManager.java
+++ b/solr/core/src/java/org/apache/solr/schema/SchemaManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrResourceLoader;
@@ -93,7 +94,7 @@ public class SchemaManager {
if (timeout < 1) {
timeout = 600;
}
- TimeOut timeOut = new TimeOut(timeout, TimeUnit.SECONDS);
+ TimeOut timeOut = new TimeOut(timeout, TimeUnit.SECONDS, TimeSource.NANO_TIME);
SolrCore core = req.getCore();
String errorMsg = "Unable to persist managed schema. ";
List errors = Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 0e575d6..ba4aa13 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -78,6 +78,7 @@ import org.apache.solr.common.util.JsonSchemaValidator;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
import org.apache.solr.core.CoreContainer;
@@ -347,7 +348,7 @@ public class HttpSolrCall {
if (rsp.getValues().get("success") == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not auto-create " + SYSTEM_COLL + " collection: "+ Utils.toJSONString(rsp.getValues()));
}
- TimeOut timeOut = new TimeOut(3, TimeUnit.SECONDS);
+ TimeOut timeOut = new TimeOut(3, TimeUnit.SECONDS, TimeSource.NANO_TIME);
for (; ; ) {
if (cores.getZkController().getClusterState().getCollectionOrNull(SYSTEM_COLL) != null) {
break;
@@ -355,7 +356,7 @@ public class HttpSolrCall {
if (timeOut.hasTimedOut()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find " + SYSTEM_COLL + " collection even after 3 seconds");
}
- Thread.sleep(50);
+ timeOut.sleep(50);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 9f9b742..de031a2 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -68,6 +68,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.RealTimeGetComponent;
@@ -1142,7 +1143,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private long waitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate,
boolean isReplayOrPeersync, VersionBucket bucket) throws IOException {
long lastFoundVersion = 0;
- TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS);
+ TimeOut waitTimeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
vinfo.lockForUpdate();
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/java/org/apache/solr/util/IdUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/IdUtils.java b/solr/core/src/java/org/apache/solr/util/IdUtils.java
index a6ea7d2..4f841f3 100644
--- a/solr/core/src/java/org/apache/solr/util/IdUtils.java
+++ b/solr/core/src/java/org/apache/solr/util/IdUtils.java
@@ -19,6 +19,7 @@ package org.apache.solr.util;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.StringHelper;
+import org.apache.solr.common.util.TimeSource;
/**
* Helper class for generating unique ID-s.