You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/06/01 14:04:43 UTC
[1/3] lucene-solr:jira/solr-10745: SOLR-10606: Correctly handle #EACH
trigger suspend / resume. Report names of actually modified trigger names.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-10745 3e05eefc9 -> b67de922e
SOLR-10606: Correctly handle #EACH trigger suspend / resume. Report names of
actually modified trigger names.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ee2be202
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ee2be202
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ee2be202
Branch: refs/heads/jira/solr-10745
Commit: ee2be2024e67533898d4afd4d6010b3b0c8eb2b5
Parents: cae6b6e
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue May 30 16:21:56 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue May 30 16:21:56 2017 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/AutoScalingHandler.java | 148 +++++++++++--------
.../autoscaling/AutoScalingHandlerTest.java | 41 ++++-
.../solr/common/params/AutoScalingParams.java | 53 +++++++
3 files changed, 179 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ee2be202/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
index b065b4a..bf1cc51 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
@@ -58,6 +58,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.params.CommonParams.JSON;
+import static org.apache.solr.common.params.AutoScalingParams.*;
/**
* Handler for /cluster/autoscaling
@@ -74,16 +75,16 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
public AutoScalingHandler(CoreContainer container) {
this.container = container;
Map<String, String> map = new HashMap<>(2);
- map.put("name", "compute_plan");
- map.put("class", "solr.ComputePlanAction");
+ map.put(NAME, "compute_plan");
+ map.put(CLASS, "solr.ComputePlanAction");
DEFAULT_ACTIONS.add(map);
map = new HashMap<>(2);
- map.put("name", "execute_plan");
- map.put("class", "solr.ExecutePlanAction");
+ map.put(NAME, "execute_plan");
+ map.put(CLASS, "solr.ExecutePlanAction");
DEFAULT_ACTIONS.add(map);
map = new HashMap<>(2);
- map.put("name", "log_plan");
- map.put("class", "solr.LogPlanAction");
+ map.put(NAME, "log_plan");
+ map.put(CLASS, "solr.LogPlanAction");
DEFAULT_ACTIONS.add(map);
}
@@ -107,7 +108,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
Map<String, Object> map = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
if (parts.size() == 2) {
rsp.getValues().addAll(map);
- } else if (parts.size() == 3 && "diagnostics".equals(parts.get(2))) {
+ } else if (parts.size() == 3 && DIAGNOSTICS.equals(parts.get(2))) {
handleDiagnostics(rsp, map);
}
} else {
@@ -121,34 +122,34 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
for (CommandOperation op : ops) {
switch (op.name) {
- case "set-trigger":
+ case CMD_SET_TRIGGER:
handleSetTrigger(req, rsp, op);
break;
- case "remove-trigger":
+ case CMD_REMOVE_TRIGGER:
handleRemoveTrigger(req, rsp, op);
break;
- case "set-listener":
+ case CMD_SET_LISTENER:
handleSetListener(req, rsp, op);
break;
- case "remove-listener":
+ case CMD_REMOVE_LISTENER:
handleRemoveListener(req, rsp, op);
break;
- case "suspend-trigger":
+ case CMD_SUSPEND_TRIGGER:
handleSuspendTrigger(req, rsp, op);
break;
- case "resume-trigger":
+ case CMD_RESUME_TRIGGER:
handleResumeTrigger(req, rsp, op);
break;
- case "set-policy":
+ case CMD_SET_POLICY:
handleSetPolicies(req, rsp, op);
break;
- case "remove-policy":
+ case CMD_REMOVE_POLICY:
handleRemovePolicy(req, rsp, op);
break;
- case "set-cluster-preferences":
+ case CMD_SET_CLUSTER_PREFERENCES:
handleSetClusterPreferences(req, rsp, op);
break;
- case "set-cluster-policy":
+ case CMD_SET_CLUSTER_POLICY:
handleSetClusterPolicy(req, rsp, op);
break;
default:
@@ -248,34 +249,48 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
- String triggerName = op.getStr("name");
+ String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
- if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- for (Map.Entry<String, Object> entry : triggers.entrySet()) {
- if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
- Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
- triggerProps.put("enabled", true);
- zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
+ Set<String> changed = new HashSet<>();
+ if (triggers == null) {
+ if (Policy.EACH.equals(triggerName)) {
+ // no harm no foul
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ } else {
+ if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ for (Map.Entry<String, Object> entry : triggers.entrySet()) {
+ if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+ Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
+ Boolean enabled = (Boolean)triggerProps.get(ENABLED);
+ if (enabled != null && !enabled) {
+ triggerProps.put(ENABLED, true);
+ zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
+ changed.add(entry.getKey());
+ }
+ }
}
}
+ rsp.getValues().add("changed", changed);
rsp.getValues().add("result", "success");
}
private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
- String triggerName = op.getStr("name");
+ String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
- String timeout = op.getStr("timeout", null);
+ String timeout = op.getStr(TIMEOUT, null);
Date resumeTime = null;
if (timeout != null) {
try {
@@ -289,24 +304,39 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
- if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- for (Map.Entry<String, Object> entry : triggers.entrySet()) {
- if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
- Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
- triggerProps.put("enabled", false);
- if (resumeTime != null) {
- triggerProps.put("resumeAt", resumeTime.getTime());
+ Set<String> changed = new HashSet<>();
+
+ if (triggers == null) {
+ if (Policy.EACH.equals(triggerName)) {
+ // no harm no foul
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ } else {
+ if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ for (Map.Entry<String, Object> entry : triggers.entrySet()) {
+ if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+ Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
+ Boolean enabled = (Boolean)triggerProps.get(ENABLED);
+ if (enabled == null || enabled) {
+ triggerProps.put(ENABLED, false);
+ if (resumeTime != null) {
+ triggerProps.put(RESUME_AT, resumeTime.getTime());
+ }
+ zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
+ changed.add(entry.getKey());
+ }
}
- zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
}
}
+ rsp.getValues().add("changed", changed);
rsp.getValues().add("result", "success");
}
private void handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
- String listenerName = op.getStr("name");
+ String listenerName = op.getStr(NAME);
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
@@ -321,12 +351,12 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
- String listenerName = op.getStr("name");
- String triggerName = op.getStr("trigger");
- List<String> stageNames = op.getStrs("stage", Collections.emptyList());
- String listenerClass = op.getStr("class");
- List<String> beforeActions = op.getStrs("beforeAction", Collections.emptyList());
- List<String> afterActions = op.getStrs("afterAction", Collections.emptyList());
+ String listenerName = op.getStr(NAME);
+ String triggerName = op.getStr(TRIGGER);
+ List<String> stageNames = op.getStrs(STAGE, Collections.emptyList());
+ String listenerClass = op.getStr(CLASS);
+ List<String> beforeActions = op.getStrs(BEFORE_ACTION, Collections.emptyList());
+ List<String> afterActions = op.getStrs(AFTER_ACTION, Collections.emptyList());
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
@@ -367,7 +397,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
actionNames.addAll(beforeActions);
actionNames.addAll(afterActions);
for (Map<String, String> action : actions) {
- actionNames.remove(action.get("name"));
+ actionNames.remove(action.get(NAME));
}
if (!actionNames.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger '" + triggerName + "' does not have actions named: " + actionNames);
@@ -403,19 +433,19 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
- String triggerName = op.getStr("name");
+ String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
- String eventTypeStr = op.getStr("event");
+ String eventTypeStr = op.getStr(EVENT);
if (eventTypeStr == null || eventTypeStr.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The event type cannot be null or empty in trigger: " + triggerName);
}
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(eventTypeStr.trim().toUpperCase(Locale.ROOT));
- String waitForStr = op.getStr("waitFor", null);
+ String waitForStr = op.getStr(WAIT_FOR, null);
if (waitForStr != null) {
int seconds = 0;
try {
@@ -423,25 +453,25 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'waitFor' value in trigger: " + triggerName);
}
- op.getDataMap().put("waitFor", seconds);
+ op.getDataMap().put(WAIT_FOR, seconds);
}
- Integer lowerBound = op.getInt("lowerBound", null);
- Integer upperBound = op.getInt("upperBound", null);
+ Integer lowerBound = op.getInt(LOWER_BOUND, null);
+ Integer upperBound = op.getInt(UPPER_BOUND, null);
- List<Map<String, String>> actions = (List<Map<String, String>>) op.getVal("actions");
+ List<Map<String, String>> actions = (List<Map<String, String>>) op.getVal(ACTIONS);
if (actions == null) {
actions = DEFAULT_ACTIONS;
- op.getDataMap().put("actions", actions);
+ op.getDataMap().put(ACTIONS, actions);
}
// validate that we can load all the actions
// todo nocommit -- what about MemClassLoader?
for (Map<String, String> action : actions) {
- if (!action.containsKey("name") || !action.containsKey("class")) {
+ if (!action.containsKey(NAME) || !action.containsKey(CLASS)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No 'name' or 'class' specified for action: " + action);
}
- String klass = action.get("class");
+ String klass = action.get(CLASS);
try {
container.getResourceLoader().findClass(klass, TriggerAction.class);
} catch (Exception e) {
@@ -474,8 +504,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
private void handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
- String triggerName = op.getStr("name");
- boolean removeListeners = op.getBoolean("removeListeners", false);
+ String triggerName = op.getStr(NAME);
+ boolean removeListeners = op.getBoolean(REMOVE_LISTENERS, false);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
@@ -491,7 +521,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (listeners != null) {
for (Map.Entry<String, Map<String, Object>> entry : listeners.entrySet()) {
Map<String, Object> listenerProps = entry.getValue();
- if (triggerName.equals(listenerProps.get("trigger")) && !removeListeners) {
+ if (triggerName.equals(listenerProps.get(TRIGGER)) && !removeListeners) {
activeListeners.add(entry.getKey());
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ee2be202/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
index 1a2a5e5..39dea17 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
@@ -75,14 +75,34 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
@Test
public void testSuspendTrigger() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
+ String suspendEachCommand = "{\n" +
+ "\t\"suspend-trigger\" : {\n" +
+ "\t\t\"name\" : \"" + Policy.EACH + "\"\n" +
+ "\t}\n" +
+ "}";
+ String resumeEachCommand = "{\n" +
+ "\t\"resume-trigger\" : {\n" +
+ "\t\t\"name\" : \"" + Policy.EACH + "\"\n" +
+ "\t}\n" +
+ "}";
+ // these should be no-ops because there are no triggers, and it should succeed
+ SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendEachCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ assertEquals(response.get("changed").toString(), "[]");
+ req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeEachCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ assertEquals(response.get("changed").toString(), "[]");
+
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_lost_trigger'," +
"'event' : 'nodeLost'," +
"'waitFor' : '10m'," +
"'enabled' : true}}";
- SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
+ req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
setTriggerCommand = "{" +
@@ -105,6 +125,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+ assertEquals(response.get("changed").toString(), "[node_lost_trigger]");
byte[] data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
ZkNodeProps loaded = ZkNodeProps.load(data);
@@ -122,12 +143,15 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
suspendTriggerCommand = "{" +
"'suspend-trigger' : {" +
- "'name' : '#EACH'" +
+ "'name' : '" + Policy.EACH + "'" +
"}" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+ List<String> changed = (List<String>)response.get("changed");
+ assertEquals(1, changed.size());
+ assertTrue(changed.contains("node_added_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
@@ -148,6 +172,9 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+ changed = (List<String>)response.get("changed");
+ assertEquals(1, changed.size());
+ assertTrue(changed.contains("node_added_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
@@ -162,12 +189,15 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
resumeTriggerCommand = "{" +
"'resume-trigger' : {" +
- "'name' : '#EACH'" +
+ "'name' : '" + Policy.EACH + "'" +
"}" +
"}";
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+ changed = (List<String>)response.get("changed");
+ assertEquals(1, changed.size());
+ assertTrue(changed.contains("node_lost_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
@@ -189,6 +219,9 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
+ changed = (List<String>)response.get("changed");
+ assertEquals(1, changed.size());
+ assertTrue(changed.contains("node_lost_trigger"));
data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
loaded = ZkNodeProps.load(data);
triggers = (Map<String, Object>) loaded.get("triggers");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ee2be202/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
new file mode 100644
index 0000000..1b5feb9
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.params;
+
+/**
+ * Requests parameters for autoscaling.
+ */
+public interface AutoScalingParams {
+
+ // parameters
+ String DIAGNOSTICS = "diagnostics";
+ String NAME = "name";
+ String TRIGGER = "trigger";
+ String EVENT = "event";
+ String ACTIONS = "actions";
+ String WAIT_FOR = "waitFor";
+ String LOWER_BOUND = "lowerBound";
+ String UPPER_BOUND = "upperBound";
+ String STAGE = "stage";
+ String CLASS = "class";
+ String ENABLED = "enabled";
+ String RESUME_AT = "resumeAt";
+ String BEFORE_ACTION = "beforeAction";
+ String AFTER_ACTION = "afterAction";
+ String TIMEOUT = "timeout";
+ String REMOVE_LISTENERS = "removeListeners";
+
+ // commands
+ String CMD_SET_TRIGGER = "set-trigger";
+ String CMD_REMOVE_TRIGGER = "remove-trigger";
+ String CMD_SET_LISTENER = "set-listener";
+ String CMD_REMOVE_LISTENER = "remove-listener";
+ String CMD_SUSPEND_TRIGGER = "suspend-trigger";
+ String CMD_RESUME_TRIGGER = "resume-trigger";
+ String CMD_SET_POLICY = "set-policy";
+ String CMD_REMOVE_POLICY = "remove-policy";
+ String CMD_SET_CLUSTER_PREFERENCES = "set-cluster-preferences";
+ String CMD_SET_CLUSTER_POLICY = "set-cluster-policy";
+}
[3/3] lucene-solr:jira/solr-10745: SOLR-10745: Fix buggy element
removal in triggers. Move marker processing to trigger init(). Clean old
markers on Overseer start if there are no triggers to consume them.
Posted by ab...@apache.org.
SOLR-10745: Fix buggy element removal in triggers. Move marker processing to
trigger init(). Clean old markers on Overseer start if there are no triggers to
consume them.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b67de922
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b67de922
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b67de922
Branch: refs/heads/jira/solr-10745
Commit: b67de922e409d8c88b9e82655ed4a3bf0d35d4f7
Parents: c0f3f0d
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jun 1 16:02:13 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jun 1 16:02:13 2017 +0200
----------------------------------------------------------------------
.../org/apache/solr/cloud/ZkController.java | 12 +--
.../cloud/autoscaling/AutoScalingConfig.java | 97 ++++++++++++++++++++
.../cloud/autoscaling/NodeAddedTrigger.java | 60 +++++++-----
.../solr/cloud/autoscaling/NodeLostTrigger.java | 56 ++++++-----
.../autoscaling/OverseerTriggerThread.java | 52 ++++++++++-
.../solr/cloud/autoscaling/TriggerBase.java | 8 +-
.../solr/cloud/autoscaling/TestPolicyCloud.java | 1 -
.../autoscaling/TriggerIntegrationTest.java | 91 +++++++++++-------
8 files changed, 285 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a9004b8..37019ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -741,11 +741,9 @@ public class ZkController {
LiveNodesListener listener = (oldNodes, newNodes) -> {
oldNodes.removeAll(newNodes);
if (oldNodes.isEmpty()) { // only added nodes
- log.debug("-- skip, only new nodes: " + newNodes);
return;
}
if (isClosed) {
- log.debug("-- skip, closed: old=" + oldNodes + ", new=" + newNodes);
return;
}
// if this node is in the top three then attempt to create nodeLost message
@@ -755,21 +753,17 @@ public class ZkController {
break;
}
if (i > 2) {
- log.debug("-- skip, " + getNodeName() + " not in the top 3 of " + newNodes);
return; // this node is not in the top three
}
i++;
}
+
for (String n : oldNodes) {
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
try {
- // nocommit decide between EPHEMERAL vs. PERSISTENT, the latter needs
- // explicit cleanup on cluster restart if there are no nodeLost triggers
zkClient.create(path, null, CreateMode.PERSISTENT, true);
- log.debug("-- created " + path);
} catch (KeeperException.NodeExistsException e) {
// someone else already created this node - ignore
- log.debug("-- skip, already exists " + path);
} catch (KeeperException | InterruptedException e1) {
log.warn("Unable to register nodeLost path for " + n, e1);
}
@@ -857,8 +851,8 @@ public class ZkController {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
if (!zkClient.exists(nodeAddedPath, true)) {
- // nocommit use EPHEMERAL or PERSISTENT?
- // EPHEMERAL will disappear if this node shuts down, PERSISTENT will need an explicit cleanup
+ // use EPHEMERAL so that it disappears if this node goes down
+ // and no other action is taken
ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
}
zkClient.multi(ops, true);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
new file mode 100644
index 0000000..2877cb9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -0,0 +1,97 @@
+package org.apache.solr.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.Utils;
+
+/**
+ * Simple bean representation of <code>autoscaling.json</code>, which parses data
+ * lazily.
+ */
+public class AutoScalingConfig {
+
+ private final Map<String, Object> jsonMap;
+
+ private Policy policy;
+ private Map<String, TriggerConfig> triggers;
+ private Map<String, ListenerConfig> listeners;
+
+ /**
+ * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.TriggerListener} config.
+ */
+ public static class ListenerConfig {
+ public String trigger;
+ public List<String> stages;
+ public String listenerClass;
+ public List<Map<String, String>> beforeActions;
+ public List<Map<String, String>> afterActions;
+
+ public ListenerConfig(Map<String, Object> properties) {
+ trigger = (String)properties.get(AutoScalingParams.TRIGGER);
+ stages = (List<String>)properties.getOrDefault(AutoScalingParams.STAGE, Collections.emptyList());
+ listenerClass = (String)properties.get(AutoScalingParams.CLASS);
+ beforeActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.BEFORE_ACTION, Collections.emptyList());
+ afterActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.AFTER_ACTION, Collections.emptyList());
+ }
+ }
+
+ /**
+ * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
+ */
+ public static class TriggerConfig {
+ public final AutoScaling.EventType eventType;
+ public final Map<String, Object> properties = new HashMap<>();
+
+ public TriggerConfig(Map<String, Object> properties) {
+ String event = (String) properties.get(AutoScalingParams.EVENT);
+ this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
+ this.properties.putAll(properties);
+ }
+ }
+
+ public AutoScalingConfig(Map<String, Object> jsonMap) {
+ this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
+ }
+
+ public Policy getPolicy() {
+ if (policy == null) {
+ policy = new Policy(jsonMap);
+ }
+ return policy;
+ }
+
+ public Map<String, TriggerConfig> getTriggerConfigs() {
+ if (triggers == null) {
+ Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
+ if (trigMap == null) {
+ triggers = Collections.emptyMap();
+ } else {
+ triggers = new HashMap<>(trigMap.size());
+ for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
+ triggers.put(entry.getKey(), new TriggerConfig((Map<String, Object>)entry.getValue()));
+ }
+ }
+ }
+ return triggers;
+ }
+
+ public Map<String, ListenerConfig> getListenerConfigs() {
+ if (listeners == null) {
+ Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
+ if (map == null) {
+ listeners = Collections.emptyMap();
+ } else {
+ listeners = new HashMap<>(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ listeners.put(entry.getKey(), new ListenerConfig((Map<String, Object>)entry.getValue()));
+ }
+ }
+ }
+ return listeners;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index f304ba7..7a46fc7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -62,7 +62,7 @@ public class NodeAddedTrigger extends TriggerBase {
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeAdded = new ConcurrentHashMap<>();
+ private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -99,6 +99,20 @@ public class NodeAddedTrigger extends TriggerBase {
actions.get(i).init(map);
}
}
+ // pick up added nodes for which marker paths were created
+ try {
+ List<String> added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+ added.forEach(n -> {
+ log.debug("Adding node from marker path: {}", n);
+ nodeNameVsTimeAdded.put(n, timeSource.getTime());
+ removeNodeAddedMarker(n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception retrieving nodeLost markers", e);
+ }
+
}
@Override
@@ -225,28 +239,13 @@ public class NodeAddedTrigger extends TriggerBase {
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
long eventTime = timeSource.getTime();
- nodeNameVsTimeAdded.put(n, eventTime);
log.debug("Tracking new node: {} at time {}", n, eventTime);
+ nodeNameVsTimeAdded.put(n, eventTime);
});
- // pick up added nodes for which marker paths were created
- try {
- List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
- lost.forEach(n -> {
- log.debug("Adding node from marker path: {}", n);
- nodeNameVsTimeAdded.put(n, timeSource.getTime());
- try {
- container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n, -1, true);
- } catch (KeeperException | InterruptedException e) {
- log.debug("Exception removing nodeAdded marker " + n, e);
- }
- });
- } catch (KeeperException | InterruptedException e) {
- log.warn("Exception retrieving nodeLost markers", e);
- }
-
// has enough time expired to trigger events for a node?
- for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
+ for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeAdded.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
long now = timeSource.getTime();
@@ -257,20 +256,35 @@ public class NodeAddedTrigger extends TriggerBase {
log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
// remove from tracking set only if the fire was accepted
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeAddedMarker(nodeName);
}
} else {
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeAddedMarker(nodeName);
}
}
}
-
lastLiveNodes = new HashSet(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeAddedTrigger", e);
}
}
+ private void removeNodeAddedMarker(String nodeName) {
+ String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
+ try {
+ if (container.getZkController().getZkClient().exists(path, true)) {
+ container.getZkController().getZkClient().delete(path, -1, true);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.debug("Exception removing nodeAdded marker " + nodeName, e);
+ }
+
+ }
+
@Override
public boolean isClosed() {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 6450bda..2af4cc5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -62,7 +62,7 @@ public class NodeLostTrigger extends TriggerBase {
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+ private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
public NodeLostTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -98,6 +98,19 @@ public class NodeLostTrigger extends TriggerBase {
actions.get(i).init(map);
}
}
+ // pick up lost nodes for which marker paths were created
+ try {
+ List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+ lost.forEach(n -> {
+ log.debug("Adding lost node from marker path: {}", n);
+ nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+ removeNodeLostMarker(n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception retrieving nodeLost markers", e);
+ }
}
@Override
@@ -227,24 +240,9 @@ public class NodeLostTrigger extends TriggerBase {
nodeNameVsTimeRemoved.put(n, timeSource.getTime());
});
- // pick up lost nodes for which marker paths were created
- try {
- List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
- lost.forEach(n -> {
- log.debug("Adding lost node from marker path: {}", n);
- nodeNameVsTimeRemoved.put(n, timeSource.getTime());
- try {
- container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n, -1, true);
- } catch (KeeperException | InterruptedException e) {
- log.warn("Exception removing nodeLost marker " + n, e);
- }
- });
- } catch (KeeperException | InterruptedException e) {
- log.warn("Exception retrieving nodeLost markers", e);
- }
-
// has enough time expired to trigger events for a node?
- for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
+ for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeRemoved.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
@@ -253,20 +251,34 @@ public class NodeLostTrigger extends TriggerBase {
if (listener != null) {
log.debug("NodeLostTrigger firing registered listener");
if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeLostMarker(nodeName);
}
} else {
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeLostMarker(nodeName);
}
}
}
-
lastLiveNodes = new HashSet<>(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeLostTrigger", e);
}
}
+ private void removeNodeLostMarker(String nodeName) {
+ String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
+ try {
+ if (container.getZkController().getZkClient().exists(path, true)) {
+ container.getZkController().getZkClient().delete(path, -1, true);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception removing nodeLost marker " + nodeName, e);
+ }
+ }
+
@Override
public boolean isClosed() {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 4a89ce7..91146b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -169,10 +170,59 @@ public class OverseerTriggerThread implements Runnable, Closeable {
scheduledTriggers.remove(managedTriggerName);
}
}
+ // check for nodeLost triggers in the current config, and if
+ // absent then clean up old nodeLost / nodeAdded markers
+ boolean cleanOldNodeLostMarkers = true;
+ boolean cleanOldNodeAddedMarkers = true;
// add new triggers and/or replace and close the replaced triggers
for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
+ if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODELOST)) {
+ cleanOldNodeLostMarkers = false;
+ }
+ if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODEADDED)) {
+ cleanOldNodeAddedMarkers = false;
+ }
scheduledTriggers.add(entry.getValue());
}
+ if (cleanOldNodeLostMarkers) {
+ log.debug("-- clean old nodeLost markers");
+ try {
+ List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+ markers.forEach(n -> {
+ removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Error removing old nodeLost markers", e);
+ }
+ }
+ if (cleanOldNodeAddedMarkers) {
+ log.debug("-- clean old nodeAdded markers");
+ try {
+ List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+ markers.forEach(n -> {
+ removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Error removing old nodeAdded markers", e);
+ }
+
+ }
+ }
+ }
+
+ private void removeNodeMarker(String path, String nodeName) {
+ path = path + "/" + nodeName;
+ try {
+ zkClient.delete(path, -1, true);
+ log.debug(" -- deleted " + path);
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Error removing old marker " + path, e);
}
}
@@ -250,7 +300,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
Map<String, Object> props = (Map<String, Object>) entry.getValue();
- String event = (String) props.get("event");
+ String event = (String) props.get(AutoScalingParams.EVENT);
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
String triggerName = entry.getKey();
triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
index ef9a3cf..7aff846 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -97,11 +97,11 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
LOG.warn("Exception getting trigger state '" + path + "'", e);
}
if (data != null) {
- Map<String, Object> state = (Map<String, Object>)Utils.fromJSON(data);
+ Map<String, Object> restoredState = (Map<String, Object>)Utils.fromJSON(data);
// make sure lastState is sorted
- state = Utils.getDeepCopy(state, 10, false, true);;
- setState(state);
- lastState = state;
+ restoredState = Utils.getDeepCopy(restoredState, 10, false, true);
+ setState(restoredState);
+ lastState = restoredState;
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
index 5038278..a809873 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
@@ -30,7 +30,6 @@ import org.apache.solr.cloud.OverseerTaskProcessor;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index b2c95b7..ae3f72d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -139,6 +139,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
paths.forEach(n -> {
try {
ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
} catch (KeeperException | InterruptedException e) {
log.warn("Error deleting old data", e);
}
@@ -634,10 +636,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public void process(TriggerEvent event) {
+ log.info("-- event: " + event);
events.add(event);
getActionStarted().countDown();
try {
- Thread.sleep(5000);
+ Thread.sleep(eventQueueActionWait);
triggerFired.compareAndSet(false, true);
getActionCompleted().countDown();
} catch (InterruptedException e) {
@@ -658,6 +661,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
+ public static long eventQueueActionWait = 5000;
+
@Test
public void testEventQueue() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
@@ -696,19 +701,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertNotNull(nodeAddedEvent);
// but action did not complete yet so the event is still enqueued
assertFalse(triggerFired.get());
+ events.clear();
actionStarted = new CountDownLatch(1);
+ eventQueueActionWait = 1;
// kill overseer leader
cluster.stopJettySolrRunner(overseerLeaderIndex);
Thread.sleep(5000);
+ // new overseer leader should be elected and run triggers
await = actionInterrupted.await(3, TimeUnit.SECONDS);
assertTrue("action wasn't interrupted", await);
- // new overseer leader should be elected and run triggers
- newNode = cluster.startJettySolrRunner();
- // it should fire again but not complete yet
+ // it should fire again from enqueued event
await = actionStarted.await(60, TimeUnit.SECONDS);
TriggerEvent replayedEvent = events.iterator().next();
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
- assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
+ assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
await = actionCompleted.await(10, TimeUnit.SECONDS);
assertTrue(triggerFired.get());
}
@@ -743,6 +749,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
+ events.clear();
+
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
@@ -797,8 +805,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
public static class TestEventMarkerAction implements TriggerAction {
- // sanity check that an action instance is only invoked once
- private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
public TestEventMarkerAction() {
actionConstructorCalled.countDown();
@@ -806,7 +812,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public String getName() {
- return "TestTriggerAction";
+ return "TestEventMarkerAction";
}
@Override
@@ -841,17 +847,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public void init(Map<String, String> args) {
- log.info("TestTriggerAction init");
+ log.info("TestEventMarkerAction init");
actionInitCalled.countDown();
}
}
@Test
- public void testNodeEventsRegistration() throws Exception {
+ public void testNodeMarkersRegistration() throws Exception {
// for this test we want to create two triggers so we must assert that the actions were created twice
actionInitCalled = new CountDownLatch(2);
// similarly we want both triggers to fire
- triggerFiredLatch = new CountDownLatch(3);
+ triggerFiredLatch = new CountDownLatch(2);
TestLiveNodesListener listener = registerLiveNodesListener();
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
@@ -875,7 +881,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
listener.reset();
- // stop overseer, which should also cause nodeLost event
+ // stop overseer
+ log.info("====== KILL OVERSEER 1");
cluster.stopJettySolrRunner(overseerLeaderIndex);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
@@ -883,12 +890,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(1, listener.lostNodes.size());
assertEquals(overseerLeader, listener.lostNodes.iterator().next());
assertEquals(0, listener.addedNodes.size());
- // verify that a znode exists
+ // wait until the new overseer is up
+ Thread.sleep(5000);
+ // verify that a znode does NOT exist - there's no nodeLost trigger,
+ // so the new overseer cleaned up existing nodeLost markers
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
- assertTrue("Path " + pathLost + " wasn't created", zkClient().exists(pathLost, true));
+ assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
listener.reset();
// create another node
+ log.info("====== ADD NODE 1");
JettySolrRunner node1 = cluster.startJettySolrRunner();
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
@@ -902,6 +913,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// set up triggers
CloudSolrClient solrClient = cluster.getSolrClient();
+ log.info("====== ADD TRIGGERS");
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
@@ -926,25 +938,40 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
- fail("Both triggers should have fired by now");
- }
- assertEquals(3, events.size());
- // 2 nodeAdded, 1 nodeLost
- int nodeAdded = 0;
- int nodeLost = 0;
- for (TriggerEvent ev : events) {
- String nodeName = (String)ev.getProperty(TriggerEvent.NODE_NAME);
- if (ev.eventType.equals(AutoScaling.EventType.NODELOST)) {
- assertEquals(overseerLeader, nodeName);
- nodeLost++;
- } else if (ev.eventType.equals(AutoScaling.EventType.NODEADDED)) {
- assertTrue(nodeName + " not one of: " + node.getNodeName() + ", " + node1.getNodeName(),
- nodeName.equals(node.getNodeName()) || nodeName.equals(node1.getNodeName()));
- nodeAdded++;
+ overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ overseerLeader = (String) overSeerStatus.get("leader");
+ overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
}
}
- assertEquals(1, nodeLost);
- assertEquals(2, nodeAdded);
+
+ Thread.sleep(5000);
+ // old nodeAdded markers should be consumed now by nodeAdded trigger
+ // but it doesn't result in new events because all nodes have been added
+ // before we configured the trigger
+ assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+
+ listener.reset();
+ events.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ // kill overseer again
+ log.info("====== KILL OVERSEER 2");
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+
+
+ if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+ fail("Trigger should have fired by now");
+ }
+ assertEquals(1, events.size());
+ TriggerEvent ev = events.iterator().next();
+ assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
+ assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
}
}
[2/3] lucene-solr:jira/solr-10745: Merge branch 'feature/autoscaling'
into jira/solr-10745
Posted by ab...@apache.org.
Merge branch 'feature/autoscaling' into jira/solr-10745
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c0f3f0dc
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c0f3f0dc
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c0f3f0dc
Branch: refs/heads/jira/solr-10745
Commit: c0f3f0dc921ec5e8310d799a0bae073d8da2f848
Parents: 3e05eef ee2be20
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed May 31 12:09:11 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed May 31 12:09:11 2017 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/AutoScalingHandler.java | 148 +++++++++++--------
.../autoscaling/AutoScalingHandlerTest.java | 41 ++++-
.../solr/common/params/AutoScalingParams.java | 53 +++++++
3 files changed, 179 insertions(+), 63 deletions(-)
----------------------------------------------------------------------