You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/23 17:32:43 UTC
lucene-solr:jira/solr-11833: SOLR-11833: Fix the waitFor
implementation, add support for back-compat config.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-11833 516aad5a5 -> 14824ca38
SOLR-11833: Fix the waitFor implementation, add support for back-compat config.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/14824ca3
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/14824ca3
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/14824ca3
Branch: refs/heads/jira/solr-11833
Commit: 14824ca3834fc915a59e0457ef79986d12ca8623
Parents: 516aad5
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon Apr 23 19:32:09 2018 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon Apr 23 19:32:09 2018 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/SearchRateTrigger.java | 100 ++++++++--
.../org/apache/solr/cloud/CloudTestUtils.java | 8 +-
.../SearchRateTriggerIntegrationTest.java | 35 ++--
.../autoscaling/SearchRateTriggerTest.java | 190 +++++++++++++++++--
.../solrj/impl/SolrClientCloudManager.java | 2 +-
5 files changed, 278 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14824ca3/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
index 238fcde..1824f7f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
@@ -67,6 +68,11 @@ public class SearchRateTrigger extends TriggerBase {
public static final String ABOVE_NODE_OP_PROP = "aboveNodeOp";
public static final String BELOW_NODE_OP_PROP = "belowNodeOp";
+ // back-compat
+ public static final String BC_COLLECTION_PROP = "collection";
+ public static final String BC_RATE_PROP = "rate";
+
+
public static final String HOT_NODES = "hotNodes";
public static final String HOT_COLLECTIONS = "hotCollections";
public static final String HOT_SHARDS = "hotShards";
@@ -110,7 +116,10 @@ public class SearchRateTrigger extends TriggerBase {
ABOVE_NODE_OP_PROP,
BELOW_NODE_OP_PROP,
ABOVE_RATE_PROP,
- BELOW_RATE_PROP);
+ BELOW_RATE_PROP,
+ // back-compat props
+ BC_COLLECTION_PROP,
+ BC_RATE_PROP);
}
@Override
@@ -121,6 +130,13 @@ public class SearchRateTrigger extends TriggerBase {
if (collectionsStr != null) {
collections.addAll(StrUtils.splitSmart(collectionsStr, ','));
}
+ // check back-compat collection prop
+ collectionsStr = (String)properties.get(BC_COLLECTION_PROP);
+ if (collectionsStr != null) {
+ if (!collectionsStr.equals(Policy.ANY)) {
+ collections.add(collectionsStr);
+ }
+ }
shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
if (!shard.equals(Policy.ANY) && (collections.isEmpty() || collections.size() > 1)) {
throw new TriggerValidationException(name, AutoScalingParams.SHARD, "When 'shard' is other than #ANY then exactly one collection name must be set");
@@ -149,6 +165,10 @@ public class SearchRateTrigger extends TriggerBase {
Object above = properties.get(ABOVE_RATE_PROP);
Object below = properties.get(BELOW_RATE_PROP);
+ // back-compat rate prop
+ if (properties.containsKey(BC_RATE_PROP)) {
+ above = properties.get(BC_RATE_PROP);
+ }
if (above == null && below == null) {
throw new TriggerValidationException(name, ABOVE_RATE_PROP, "at least one of '" +
ABOVE_RATE_PROP + "' or '" + BELOW_RATE_PROP + "' must be set");
@@ -199,6 +219,25 @@ public class SearchRateTrigger extends TriggerBase {
}
}
+ @VisibleForTesting
+ Map<String, Object> getConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("name", name);
+ config.put(COLLECTIONS_PROP, collections);
+ config.put(AutoScalingParams.SHARD, shard);
+ config.put(AutoScalingParams.NODE, node);
+ config.put(METRIC_PROP, metric);
+ config.put(MAX_OPS_PROP, maxOps);
+ config.put(MIN_REPLICAS_PROP, minReplicas);
+ config.put(ABOVE_RATE_PROP, aboveRate);
+ config.put(BELOW_RATE_PROP, belowRate);
+ config.put(ABOVE_OP_PROP, aboveOp);
+ config.put(ABOVE_NODE_OP_PROP, aboveNodeOp);
+ config.put(BELOW_OP_PROP, belowOp);
+ config.put(BELOW_NODE_OP_PROP, belowNodeOp);
+ return config;
+ }
+
@Override
protected Map<String, Object> getState() {
return state;
@@ -280,10 +319,7 @@ public class SearchRateTrigger extends TriggerBase {
shards.forEach((sh, replicas) -> {
AtomicInteger repl = replPerShard.computeIfAbsent(sh, s -> new AtomicInteger());
replicas.forEach(replica -> {
- // skip PULL and non-active replicas
- if (replica.getType().equals(Replica.Type.PULL)) {
- return;
- }
+ // skip non-active replicas
if (replica.getState() != Replica.State.ACTIVE) {
return;
}
@@ -325,12 +361,19 @@ public class SearchRateTrigger extends TriggerBase {
// check for exceeded rates and filter out those with less than waitFor from previous events
nodeRates.entrySet().stream()
.filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
- .filter(entry -> waitForElapsed(entry.getKey(), now, lastNodeEvent))
.forEach(entry -> {
if (entry.getValue().get() > aboveRate) {
- hotNodes.put(entry.getKey(), entry.getValue().get());
+ if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
+ hotNodes.put(entry.getKey(), entry.getValue().get());
+ }
} else if (entry.getValue().get() < belowRate) {
- coldNodes.put(entry.getKey(), entry.getValue().get());
+ if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
+ coldNodes.put(entry.getKey(), entry.getValue().get());
+ }
+ } else {
+ // no violation - clear waitForElapsed
+ // (violation is only valid if it persists throughout waitFor)
+ lastNodeEvent.remove(entry.getKey());
}
});
@@ -342,23 +385,36 @@ public class SearchRateTrigger extends TriggerBase {
shardRates.forEach((sh, replicaRates) -> {
double shardRate = replicaRates.stream()
.map(r -> {
- if (waitForElapsed(r.getCollection() + "." + r.getCore(), now, lastReplicaEvent)) {
- if (((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate)) {
+ String elapsedKey = r.getCollection() + "." + r.getCore();
+ if ((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate) {
+ if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
hotReplicas.add(r);
- } else if (((Double)r.getVariable(AutoScalingParams.RATE) < belowRate)) {
+ }
+ } else if ((Double)r.getVariable(AutoScalingParams.RATE) < belowRate) {
+ if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
coldReplicas.add(r);
}
+ } else {
+ // no violation - clear waitForElapsed
+ lastReplicaEvent.remove(elapsedKey);
}
return r;
})
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
- if (waitForElapsed(coll + "." + sh, now, lastShardEvent) &&
- (collections.isEmpty() || collections.contains(coll)) &&
+ String elapsedKey = coll + "." + sh;
+ if ((collections.isEmpty() || collections.contains(coll)) &&
(shard.equals(Policy.ANY) || shard.equals(sh))) {
if (shardRate > aboveRate) {
- hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
+ hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ }
} else if (shardRate < belowRate) {
- coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
+ coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
+ }
+ } else {
+ // no violation - clear waitForElapsed
+ lastShardEvent.remove(elapsedKey);
}
}
});
@@ -370,12 +426,18 @@ public class SearchRateTrigger extends TriggerBase {
double total = shardRates.entrySet().stream()
.mapToDouble(e -> e.getValue().stream()
.mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
- if (waitForElapsed(coll, now, lastCollectionEvent) &&
- (collections.isEmpty() || collections.contains(coll))) {
+ if (collections.isEmpty() || collections.contains(coll)) {
if (total > aboveRate) {
- hotCollections.put(coll, total);
+ if (waitForElapsed(coll, now, lastCollectionEvent)) {
+ hotCollections.put(coll, total);
+ }
} else if (total < belowRate) {
- coldCollections.put(coll, total);
+ if (waitForElapsed(coll, now, lastCollectionEvent)) {
+ coldCollections.put(coll, total);
+ }
+ } else {
+ // no violation - clear waitForElapsed
+ lastCollectionEvent.remove(coll);
}
}
});
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14824ca3/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
index 5590252..768cd91 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CloudTestUtils.java
@@ -88,9 +88,11 @@ public class CloudTestUtils {
final CollectionStatePredicate predicate) throws InterruptedException, TimeoutException, IOException {
TimeOut timeout = new TimeOut(wait, unit, cloudManager.getTimeSource());
long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
+ ClusterState state = null;
+ DocCollection coll = null;
while (!timeout.hasTimedOut()) {
- ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
- DocCollection coll = state.getCollectionOrNull(collection);
+ state = cloudManager.getClusterStateProvider().getClusterState();
+ coll = state.getCollectionOrNull(collection);
// due to the way we manage collections in SimClusterStateProvider a null here
// can mean that a collection is still being created but has no replicas
if (coll == null) { // does not yet exist?
@@ -106,7 +108,7 @@ public class CloudTestUtils {
log.trace("-- still not matching predicate: {}", state);
}
}
- throw new TimeoutException();
+ throw new TimeoutException("last state: " + coll);
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14824ca3/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
index 86b7f5f..098d0a5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
@@ -277,14 +277,14 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
public void testBelowSearchRate() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
String COLL1 = "belowRate_collection";
+ // replicationFactor == 2
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
"conf", 1, 2);
create.process(solrClient);
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
CloudTestUtils.clusterShape(1, 2));
- // add a couple of spare replicas above RF. Use different types to verify that only
- // searchable replicas are considered
+ // add a couple of spare replicas above RF. Use different types.
// these additional replicas will be placed on other nodes in the cluster
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.NRT));
solrClient.request(CollectionAdminRequest.addReplicaToShard(COLL1, "shard1", Replica.Type.TLOG));
@@ -390,8 +390,8 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(ev.toString(), "compute", ev.actionName);
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
- // 3 cold nodes, 2 cold replicas
- assertEquals(ops.toString(), 5, ops.size());
+ // 4 cold nodes, 3 cold replicas
+ assertEquals(ops.toString(), 7, ops.size());
AtomicInteger coldNodes = new AtomicInteger();
AtomicInteger coldReplicas = new AtomicInteger();
ops.forEach(op -> {
@@ -403,12 +403,12 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
fail("unexpected op: " + op);
}
});
- assertEquals("cold nodes", 3, coldNodes.get());
- assertEquals("cold replicas", 2, coldReplicas.get());
+ assertEquals("cold nodes", 4, coldNodes.get());
+ assertEquals("cold replicas", 3, coldReplicas.get());
- // now the collection should be back to RF = 2, with one additional PULL replica
+ // now the collection should be down to RF = 2
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
- CloudTestUtils.clusterShape(1, 3));
+ CloudTestUtils.clusterShape(1, 2));
listenerEvents.clear();
finished = new CountDownLatch(1);
@@ -508,7 +508,7 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
// now the collection should be at RF == 1, with one additional PULL replica
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
- CloudTestUtils.clusterShape(1, 2));
+ CloudTestUtils.clusterShape(1, 1));
}
@Test
@@ -634,8 +634,8 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(ev.toString(), "compute", ev.actionName);
List<TriggerEvent.Op> ops = (List<TriggerEvent.Op>)ev.event.getProperty(TriggerEvent.REQUESTED_OPS);
assertNotNull("there should be some requestedOps: " + ev.toString(), ops);
- // 3 DELETEREPLICA, 3 DELETENODE
- assertEquals(ops.toString(), 6, ops.size());
+ // 4 DELETEREPLICA, 4 DELETENODE
+ assertEquals(ops.toString(), 8, ops.size());
AtomicInteger replicas = new AtomicInteger();
AtomicInteger nodes = new AtomicInteger();
ops.forEach(op -> {
@@ -647,14 +647,14 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
fail("unexpected op: " + op);
}
});
- assertEquals(ops.toString(), 3, replicas.get());
- assertEquals(ops.toString(), 3, nodes.get());
+ assertEquals(ops.toString(), 4, replicas.get());
+ assertEquals(ops.toString(), 4, nodes.get());
// check status
ev = events.get(1);
assertEquals(ev.toString(), "execute", ev.actionName);
List<NamedList<Object>> responses = (List<NamedList<Object>>)ev.context.get("properties.responses");
assertNotNull(ev.toString(), responses);
- assertEquals(responses.toString(), 6, responses.size());
+ assertEquals(responses.toString(), 8, responses.size());
replicas.set(0);
nodes.set(0);
responses.forEach(m -> {
@@ -672,9 +672,12 @@ public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
}
});
- // we are left with one searchable replica and one PULL replica
+ assertEquals(responses.toString(), 4, replicas.get());
+ assertEquals(responses.toString(), 4, nodes.get());
+
+ // we are left with one searchable replica
CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS,
- CloudTestUtils.clusterShape(1, 2));
+ CloudTestUtils.clusterShape(1, 1));
}
public static class CapturingTriggerListener extends TriggerListenerBase {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14824ca3/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
index 50dd71b..cea41b4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerTest.java
@@ -18,25 +18,41 @@ package org.apache.solr.cloud.autoscaling;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
+import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.ZkDistributedQueueFactory;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.util.TimeOut;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -59,6 +75,23 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
.configure();
+ }
+
+ @Before
+ public void removeCollections() throws Exception {
+ cluster.deleteAllCollections();
+ if (cluster.getJettySolrRunners().size() < 4) {
+ cluster.startJettySolrRunner();
+ }
+ }
+
+ @Test
+ public void testTrigger() throws Exception {
+ SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
+ SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+ CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
+ SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
+
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
"conf", 2, 2);
CloudSolrClient solrClient = cluster.getSolrClient();
@@ -68,20 +101,15 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
"conf", 2, 2);
create.setMaxShardsPerNode(1);
create.process(solrClient);
- }
- @Test
- public void testTrigger() throws Exception {
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+ CloudTestUtils.waitForState(cloudManager, COLL2, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+
double rate = 1.0;
- SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
- SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
- CoreContainer container = cluster.getJettySolrRunner(0).getCoreContainer();
- SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
URL baseUrl = cluster.getJettySolrRunners().get(1).getBaseUrl();
long waitForSeconds = 5 + random().nextInt(5);
- Map<String, Object> props = createTriggerProps(waitForSeconds, rate, -1);
+ Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, rate, -1);
final List<TriggerEvent> events = new ArrayList<>();
- CloudSolrClient solrClient = cluster.getSolrClient();
try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger")) {
trigger.configure(loader, cloudManager, props);
@@ -95,13 +123,15 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
String url = baseUrl.toString() + "/" + coreName;
try (HttpSolrClient simpleClient = new HttpSolrClient.Builder(url).build()) {
SolrParams query = params(CommonParams.Q, "*:*", CommonParams.DISTRIB, "false");
- for (int i = 0; i < 200; i++) {
+ for (int i = 0; i < 500; i++) {
simpleClient.query(query);
}
trigger.run();
// waitFor delay
assertEquals(0, events.size());
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000);
+ trigger.run();
+ Thread.sleep(waitForSeconds * 1000);
// should generate replica event
trigger.run();
assertEquals(1, events.size());
@@ -120,7 +150,7 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
for (int i = 0; i < 500; i++) {
solrClient.query(COLL1, query);
}
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000);
trigger.run();
// should generate collection event
assertEquals(1, events.size());
@@ -134,8 +164,9 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
for (int i = 0; i < 1000; i++) {
solrClient.query(COLL2, query);
+ solrClient.query(COLL1, query);
}
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000);
trigger.run();
// should generate node and collection event but not for COLL2 because of waitFor
assertEquals(1, events.size());
@@ -144,11 +175,9 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
assertEquals(3, hotNodes.size());
hotNodes.forEach((n, r) -> assertTrue(n, r > rate));
hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
- assertEquals(2, hotCollections.size());
+ assertEquals(1, hotCollections.size());
Rate = hotCollections.get(COLL1);
assertNotNull(Rate);
- Rate = hotCollections.get(COLL2);
- assertNotNull(Rate);
events.clear();
// assert that waitFor prevents new events from being generated
@@ -156,10 +185,11 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
// should not generate any events
assertEquals(0, events.size());
- Thread.sleep(waitForSeconds * 1000 + 2000);
+ Thread.sleep(waitForSeconds * 1000 * 2);
trigger.run();
// should generate node and collection event
assertEquals(1, events.size());
+ event = events.get(0);
hotCollections = (Map<String, Double>)event.getProperty(SearchRateTrigger.HOT_COLLECTIONS);
assertEquals(2, hotCollections.size());
Rate = hotCollections.get(COLL1);
@@ -172,13 +202,137 @@ public class SearchRateTriggerTest extends SolrCloudTestCase {
}
}
- private Map<String, Object> createTriggerProps(long waitForSeconds, double aboveRate, double belowRate) {
+ private static final AtomicDouble mockRate = new AtomicDouble();
+
+ @Test
+ public void testWaitForElapsed() throws Exception {
+ SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ SolrZkClient zkClient = solrClient.getZkStateReader().getZkClient();
+ SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), solrClient) {
+ @Override
+ public NodeStateProvider getNodeStateProvider() {
+ return new SolrClientNodeStateProvider(solrClient) {
+ @Override
+ public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
+ Map<String, Object> values = super.getNodeValues(node, tags);
+ values.keySet().forEach(k -> {
+ values.replace(k, mockRate.get());
+ });
+ return values;
+ }
+ };
+ }
+ };
+ TimeSource timeSource = cloudManager.getTimeSource();
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+ "conf", 2, 2);
+ create.setMaxShardsPerNode(1);
+ create.process(solrClient);
+ CloudTestUtils.waitForState(cloudManager, COLL1, 60, TimeUnit.SECONDS, clusterShape(2, 2));
+
+ long waitForSeconds = 5 + random().nextInt(5);
+ Map<String, Object> props = createTriggerProps(Arrays.asList(COLL1, COLL2), waitForSeconds, 1.0, 0.1);
+ final List<TriggerEvent> events = new ArrayList<>();
+
+ try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger1")) {
+ trigger.configure(loader, cloudManager, props);
+ trigger.init();
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+ trigger.setProcessor(event -> events.add(event));
+
+ // set mock rates
+ mockRate.set(2.0);
+ TimeOut timeOut = new TimeOut(waitForSeconds + 2, TimeUnit.SECONDS, timeSource);
+ // simulate ScheduledTriggers
+ while (!timeOut.hasTimedOut()) {
+ trigger.run();
+ timeSource.sleep(1000);
+ }
+ // violation persisted longer than waitFor - there should be events
+ assertTrue(events.toString(), events.size() > 0);
+ TriggerEvent event = events.get(0);
+ assertEquals(event.toString(), TriggerEventType.SEARCHRATE, event.eventType);
+ Map<String, Object> hotNodes, hotCollections, hotShards;
+ List<ReplicaInfo> hotReplicas;
+ hotNodes = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_NODES);
+ hotCollections = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_COLLECTIONS);
+ hotShards = (Map<String, Object>)event.properties.get(SearchRateTrigger.HOT_SHARDS);
+ hotReplicas = (List<ReplicaInfo>)event.properties.get(SearchRateTrigger.HOT_REPLICAS);
+ assertFalse("no hot nodes?", hotNodes.isEmpty());
+ assertFalse("no hot collections?", hotCollections.isEmpty());
+ assertFalse("no hot shards?", hotShards.isEmpty());
+ assertFalse("no hot replicas?", hotReplicas.isEmpty());
+ }
+
+ mockRate.set(0.0);
+ events.clear();
+
+ try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
+ trigger.configure(loader, cloudManager, props);
+ trigger.init();
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+ trigger.setProcessor(event -> events.add(event));
+
+ mockRate.set(2.0);
+ trigger.run();
+ // waitFor not elapsed
+ assertTrue(events.toString(), events.isEmpty());
+ Thread.sleep(1000);
+ trigger.run();
+ assertTrue(events.toString(), events.isEmpty());
+ Thread.sleep(1000);
+ mockRate.set(0.0);
+ trigger.run();
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(waitForSeconds - 2, TimeUnit.SECONDS));
+ trigger.run();
+
+ // violations persisted shorter than waitFor - there should be no events
+ assertTrue(events.toString(), events.isEmpty());
+
+ }
+ }
+
+ @Test
+ public void testDefaultsAndBackcompat() throws Exception {
+ Map<String, Object> props = new HashMap<>();
+ props.put("rate", 1.0);
+ props.put("collection", "test");
+ SolrResourceLoader loader = cluster.getJettySolrRunner(0).getCoreContainer().getResourceLoader();
+ SolrZkClient zkClient = cluster.getSolrClient().getZkStateReader().getZkClient();
+ SolrCloudManager cloudManager = new SolrClientCloudManager(new ZkDistributedQueueFactory(zkClient), cluster.getSolrClient());
+ try (SearchRateTrigger trigger = new SearchRateTrigger("search_rate_trigger2")) {
+ trigger.configure(loader, cloudManager, props);
+ Map<String, Object> config = trigger.getConfig();
+ Set<String> collections = (Set<String>)config.get(SearchRateTrigger.COLLECTIONS_PROP);
+ assertEquals(collections.toString(), 1, collections.size());
+ assertEquals("test", collections.iterator().next());
+ assertEquals("#ANY", config.get(AutoScalingParams.SHARD));
+ assertEquals("#ANY", config.get(AutoScalingParams.NODE));
+ assertEquals(1.0, config.get(SearchRateTrigger.ABOVE_RATE_PROP));
+ assertEquals(-1.0, config.get(SearchRateTrigger.BELOW_RATE_PROP));
+ assertEquals(SearchRateTrigger.DEFAULT_METRIC, config.get(SearchRateTrigger.METRIC_PROP));
+ assertEquals(SearchRateTrigger.DEFAULT_MAX_OPS, config.get(SearchRateTrigger.MAX_OPS_PROP));
+ assertNull(config.get(SearchRateTrigger.MIN_REPLICAS_PROP));
+ assertEquals(CollectionParams.CollectionAction.ADDREPLICA, config.get(SearchRateTrigger.ABOVE_OP_PROP));
+ assertEquals(CollectionParams.CollectionAction.MOVEREPLICA, config.get(SearchRateTrigger.ABOVE_NODE_OP_PROP));
+ assertEquals(CollectionParams.CollectionAction.DELETEREPLICA, config.get(SearchRateTrigger.BELOW_OP_PROP));
+ assertNull(config.get(SearchRateTrigger.BELOW_NODE_OP_PROP));
+ }
+ }
+
+ private Map<String, Object> createTriggerProps(List<String> collections, long waitForSeconds, double aboveRate, double belowRate) {
Map<String, Object> props = new HashMap<>();
props.put("aboveRate", aboveRate);
props.put("belowRate", belowRate);
props.put("event", "searchRate");
props.put("waitFor", waitForSeconds);
props.put("enabled", true);
+ if (collections != null && !collections.isEmpty()) {
+ props.put("collections", String.join(",", collections));
+ }
List<Map<String, String>> actions = new ArrayList<>(3);
Map<String, String> map = new HashMap<>(2);
map.put("name", "compute_plan");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/14824ca3/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
index 5f469f9..fcefc2f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
public class SolrClientCloudManager implements SolrCloudManager {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final CloudSolrClient solrClient;
+ protected final CloudSolrClient solrClient;
private final ZkDistribStateManager stateManager;
private final DistributedQueueFactory queueFactory;
private final ZkStateReader zkStateReader;