You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/06/01 14:04:43 UTC

[1/3] lucene-solr:jira/solr-10745: SOLR-10606: Correctly handle #EACH trigger suspend / resume. Report names of actually modified trigger names.

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-10745 3e05eefc9 -> b67de922e


SOLR-10606: Correctly handle #EACH trigger suspend / resume. Report names of
actually modified trigger names.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ee2be202
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ee2be202
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ee2be202

Branch: refs/heads/jira/solr-10745
Commit: ee2be2024e67533898d4afd4d6010b3b0c8eb2b5
Parents: cae6b6e
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue May 30 16:21:56 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue May 30 16:21:56 2017 +0200

----------------------------------------------------------------------
 .../cloud/autoscaling/AutoScalingHandler.java   | 148 +++++++++++--------
 .../autoscaling/AutoScalingHandlerTest.java     |  41 ++++-
 .../solr/common/params/AutoScalingParams.java   |  53 +++++++
 3 files changed, 179 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ee2be202/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
index b065b4a..bf1cc51 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
@@ -58,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
 import static org.apache.solr.common.params.CommonParams.JSON;
+import static org.apache.solr.common.params.AutoScalingParams.*;
 
 /**
  * Handler for /cluster/autoscaling
@@ -74,16 +75,16 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
   public AutoScalingHandler(CoreContainer container) {
     this.container = container;
     Map<String, String> map = new HashMap<>(2);
-    map.put("name", "compute_plan");
-    map.put("class", "solr.ComputePlanAction");
+    map.put(NAME, "compute_plan");
+    map.put(CLASS, "solr.ComputePlanAction");
     DEFAULT_ACTIONS.add(map);
     map = new HashMap<>(2);
-    map.put("name", "execute_plan");
-    map.put("class", "solr.ExecutePlanAction");
+    map.put(NAME, "execute_plan");
+    map.put(CLASS, "solr.ExecutePlanAction");
     DEFAULT_ACTIONS.add(map);
     map = new HashMap<>(2);
-    map.put("name", "log_plan");
-    map.put("class", "solr.LogPlanAction");
+    map.put(NAME, "log_plan");
+    map.put(CLASS, "solr.LogPlanAction");
     DEFAULT_ACTIONS.add(map);
   }
 
@@ -107,7 +108,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
         Map<String, Object> map = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
         if (parts.size() == 2)  {
           rsp.getValues().addAll(map);
-        } else if (parts.size() == 3 && "diagnostics".equals(parts.get(2))) {
+        } else if (parts.size() == 3 && DIAGNOSTICS.equals(parts.get(2))) {
           handleDiagnostics(rsp, map);
         }
       } else {
@@ -121,34 +122,34 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
         }
         for (CommandOperation op : ops) {
           switch (op.name) {
-            case "set-trigger":
+            case CMD_SET_TRIGGER:
               handleSetTrigger(req, rsp, op);
               break;
-            case "remove-trigger":
+            case CMD_REMOVE_TRIGGER:
               handleRemoveTrigger(req, rsp, op);
               break;
-            case "set-listener":
+            case CMD_SET_LISTENER:
               handleSetListener(req, rsp, op);
               break;
-            case "remove-listener":
+            case CMD_REMOVE_LISTENER:
               handleRemoveListener(req, rsp, op);
               break;
-            case "suspend-trigger":
+            case CMD_SUSPEND_TRIGGER:
               handleSuspendTrigger(req, rsp, op);
               break;
-            case "resume-trigger":
+            case CMD_RESUME_TRIGGER:
               handleResumeTrigger(req, rsp, op);
               break;
-            case "set-policy":
+            case CMD_SET_POLICY:
               handleSetPolicies(req, rsp, op);
               break;
-            case "remove-policy":
+            case CMD_REMOVE_POLICY:
               handleRemovePolicy(req, rsp, op);
               break;
-            case "set-cluster-preferences":
+            case CMD_SET_CLUSTER_PREFERENCES:
               handleSetClusterPreferences(req, rsp, op);
               break;
-            case "set-cluster-policy":
+            case CMD_SET_CLUSTER_POLICY:
               handleSetClusterPolicy(req, rsp, op);
               break;
             default:
@@ -248,34 +249,48 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
   }
 
   private void handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
-    String triggerName = op.getStr("name");
+    String triggerName = op.getStr(NAME);
 
     if (triggerName == null || triggerName.trim().length() == 0) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
     }
     Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
     Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
-    if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
-    }
-    for (Map.Entry<String, Object> entry : triggers.entrySet()) {
-      if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
-        Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
-        triggerProps.put("enabled", true);
-        zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
+    Set<String> changed = new HashSet<>();
+    if (triggers == null) {
+      if (Policy.EACH.equals(triggerName)) {
+        // no harm no foul
+      } else {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+      }
+    } else {
+      if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+      }
+      for (Map.Entry<String, Object> entry : triggers.entrySet()) {
+        if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+          Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
+          Boolean enabled = (Boolean)triggerProps.get(ENABLED);
+          if (enabled != null && !enabled) {
+            triggerProps.put(ENABLED, true);
+            zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
+            changed.add(entry.getKey());
+          }
+        }
       }
     }
+    rsp.getValues().add("changed", changed);
     rsp.getValues().add("result", "success");
   }
 
   private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
-    String triggerName = op.getStr("name");
+    String triggerName = op.getStr(NAME);
 
     if (triggerName == null || triggerName.trim().length() == 0) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
     }
 
-    String timeout = op.getStr("timeout", null);
+    String timeout = op.getStr(TIMEOUT, null);
     Date resumeTime = null;
     if (timeout != null) {
       try {
@@ -289,24 +304,39 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
 
     Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
     Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
-    if (triggers == null || (!triggers.containsKey(triggerName)) && !"#EACH".equals(triggerName)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
-    }
-    for (Map.Entry<String, Object> entry : triggers.entrySet()) {
-      if ("#EACH".equals(triggerName) || triggerName.equals(entry.getKey())) {
-        Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
-        triggerProps.put("enabled", false);
-        if (resumeTime != null) {
-          triggerProps.put("resumeAt", resumeTime.getTime());
+    Set<String> changed = new HashSet<>();
+
+    if (triggers == null) {
+      if (Policy.EACH.equals(triggerName)) {
+      // no harm no foul
+      } else {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+      }
+    } else {
+      if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+      }
+      for (Map.Entry<String, Object> entry : triggers.entrySet()) {
+        if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+          Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
+          Boolean enabled = (Boolean)triggerProps.get(ENABLED);
+          if (enabled == null || enabled) {
+            triggerProps.put(ENABLED, false);
+            if (resumeTime != null) {
+              triggerProps.put(RESUME_AT, resumeTime.getTime());
+            }
+            zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
+            changed.add(entry.getKey());
+          }
         }
-        zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
       }
     }
+    rsp.getValues().add("changed", changed);
     rsp.getValues().add("result", "success");
   }
 
   private void handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
-    String listenerName = op.getStr("name");
+    String listenerName = op.getStr(NAME);
 
     if (listenerName == null || listenerName.trim().length() == 0) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
@@ -321,12 +351,12 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
   }
 
   private void handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
-    String listenerName = op.getStr("name");
-    String triggerName = op.getStr("trigger");
-    List<String> stageNames = op.getStrs("stage", Collections.emptyList());
-    String listenerClass = op.getStr("class");
-    List<String> beforeActions = op.getStrs("beforeAction", Collections.emptyList());
-    List<String> afterActions = op.getStrs("afterAction", Collections.emptyList());
+    String listenerName = op.getStr(NAME);
+    String triggerName = op.getStr(TRIGGER);
+    List<String> stageNames = op.getStrs(STAGE, Collections.emptyList());
+    String listenerClass = op.getStr(CLASS);
+    List<String> beforeActions = op.getStrs(BEFORE_ACTION, Collections.emptyList());
+    List<String> afterActions = op.getStrs(AFTER_ACTION, Collections.emptyList());
 
     if (listenerName == null || listenerName.trim().length() == 0) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
@@ -367,7 +397,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
     actionNames.addAll(beforeActions);
     actionNames.addAll(afterActions);
     for (Map<String, String> action : actions) {
-      actionNames.remove(action.get("name"));
+      actionNames.remove(action.get(NAME));
     }
     if (!actionNames.isEmpty()) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger '" + triggerName + "' does not have actions named: " + actionNames);
@@ -403,19 +433,19 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
   }
 
   private void handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
-    String triggerName = op.getStr("name");
+    String triggerName = op.getStr(NAME);
 
     if (triggerName == null || triggerName.trim().length() == 0) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
     }
 
-    String eventTypeStr = op.getStr("event");
+    String eventTypeStr = op.getStr(EVENT);
     if (eventTypeStr == null || eventTypeStr.trim().length() == 0) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The event type cannot be null or empty in trigger: " + triggerName);
     }
     AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(eventTypeStr.trim().toUpperCase(Locale.ROOT));
 
-    String waitForStr = op.getStr("waitFor", null);
+    String waitForStr = op.getStr(WAIT_FOR, null);
     if (waitForStr != null) {
       int seconds = 0;
       try {
@@ -423,25 +453,25 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
       } catch (IllegalArgumentException e) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'waitFor' value in trigger: " + triggerName);
       }
-      op.getDataMap().put("waitFor", seconds);
+      op.getDataMap().put(WAIT_FOR, seconds);
     }
 
-    Integer lowerBound = op.getInt("lowerBound", null);
-    Integer upperBound = op.getInt("upperBound", null);
+    Integer lowerBound = op.getInt(LOWER_BOUND, null);
+    Integer upperBound = op.getInt(UPPER_BOUND, null);
 
-    List<Map<String, String>> actions = (List<Map<String, String>>) op.getVal("actions");
+    List<Map<String, String>> actions = (List<Map<String, String>>) op.getVal(ACTIONS);
     if (actions == null) {
       actions = DEFAULT_ACTIONS;
-      op.getDataMap().put("actions", actions);
+      op.getDataMap().put(ACTIONS, actions);
     }
 
     // validate that we can load all the actions
     // todo nocommit -- what about MemClassLoader?
     for (Map<String, String> action : actions) {
-      if (!action.containsKey("name") || !action.containsKey("class")) {
+      if (!action.containsKey(NAME) || !action.containsKey(CLASS)) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No 'name' or 'class' specified for action: " + action);
       }
-      String klass = action.get("class");
+      String klass = action.get(CLASS);
       try {
         container.getResourceLoader().findClass(klass, TriggerAction.class);
       } catch (Exception e) {
@@ -474,8 +504,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
   }
 
   private void handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
-    String triggerName = op.getStr("name");
-    boolean removeListeners = op.getBoolean("removeListeners", false);
+    String triggerName = op.getStr(NAME);
+    boolean removeListeners = op.getBoolean(REMOVE_LISTENERS, false);
 
     if (triggerName == null || triggerName.trim().length() == 0) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
@@ -491,7 +521,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
     if (listeners != null) {
       for (Map.Entry<String, Map<String, Object>> entry : listeners.entrySet()) {
         Map<String, Object> listenerProps = entry.getValue();
-        if (triggerName.equals(listenerProps.get("trigger")) && !removeListeners) {
+        if (triggerName.equals(listenerProps.get(TRIGGER)) && !removeListeners) {
           activeListeners.add(entry.getKey());
         }
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ee2be202/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
index 1a2a5e5..39dea17 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
@@ -75,14 +75,34 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
   @Test
   public void testSuspendTrigger() throws Exception {
     CloudSolrClient solrClient = cluster.getSolrClient();
+    String suspendEachCommand = "{\n" +
+        "\t\"suspend-trigger\" : {\n" +
+        "\t\t\"name\" : \"" + Policy.EACH + "\"\n" +
+        "\t}\n" +
+        "}";
+    String resumeEachCommand = "{\n" +
+        "\t\"resume-trigger\" : {\n" +
+        "\t\t\"name\" : \"" + Policy.EACH + "\"\n" +
+        "\t}\n" +
+        "}";
+    // these should be no-ops because there are no triggers, and it should succeed
+    SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendEachCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+    assertEquals(response.get("changed").toString(), "[]");
+    req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeEachCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+    assertEquals(response.get("changed").toString(), "[]");
+
     String setTriggerCommand = "{" +
         "'set-trigger' : {" +
         "'name' : 'node_lost_trigger'," +
         "'event' : 'nodeLost'," +
         "'waitFor' : '10m'," +
         "'enabled' : true}}";
-    SolrRequest req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
-    NamedList<Object> response = solrClient.request(req);
+    req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
     setTriggerCommand = "{" +
@@ -105,6 +125,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
     req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
+    assertEquals(response.get("changed").toString(), "[node_lost_trigger]");
 
     byte[] data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
     ZkNodeProps loaded = ZkNodeProps.load(data);
@@ -122,12 +143,15 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
 
     suspendTriggerCommand = "{" +
         "'suspend-trigger' : {" +
-        "'name' : '#EACH'" +
+        "'name' : '" + Policy.EACH + "'" +
         "}" +
         "}";
     req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
+    List<String> changed = (List<String>)response.get("changed");
+    assertEquals(1, changed.size());
+    assertTrue(changed.contains("node_added_trigger"));
     data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
     loaded = ZkNodeProps.load(data);
     triggers = (Map<String, Object>) loaded.get("triggers");
@@ -148,6 +172,9 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
     req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
+    changed = (List<String>)response.get("changed");
+    assertEquals(1, changed.size());
+    assertTrue(changed.contains("node_added_trigger"));
     data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
     loaded = ZkNodeProps.load(data);
     triggers = (Map<String, Object>) loaded.get("triggers");
@@ -162,12 +189,15 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
 
     resumeTriggerCommand = "{" +
         "'resume-trigger' : {" +
-        "'name' : '#EACH'" +
+        "'name' : '" + Policy.EACH + "'" +
         "}" +
         "}";
     req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, resumeTriggerCommand);
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
+    changed = (List<String>)response.get("changed");
+    assertEquals(1, changed.size());
+    assertTrue(changed.contains("node_lost_trigger"));
     data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
     loaded = ZkNodeProps.load(data);
     triggers = (Map<String, Object>) loaded.get("triggers");
@@ -189,6 +219,9 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
     req = new AutoScalingRequest(SolrRequest.METHOD.POST, path, suspendTriggerCommand);
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
+    changed = (List<String>)response.get("changed");
+    assertEquals(1, changed.size());
+    assertTrue(changed.contains("node_lost_trigger"));
     data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
     loaded = ZkNodeProps.load(data);
     triggers = (Map<String, Object>) loaded.get("triggers");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ee2be202/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
new file mode 100644
index 0000000..1b5feb9
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common.params;
+
+/**
+ * Requests parameters for autoscaling.
+ */
+public interface AutoScalingParams {
+
+  // parameters
+  String DIAGNOSTICS = "diagnostics";
+  String NAME = "name";
+  String TRIGGER = "trigger";
+  String EVENT = "event";
+  String ACTIONS = "actions";
+  String WAIT_FOR = "waitFor";
+  String LOWER_BOUND = "lowerBound";
+  String UPPER_BOUND = "upperBound";
+  String STAGE = "stage";
+  String CLASS = "class";
+  String ENABLED = "enabled";
+  String RESUME_AT = "resumeAt";
+  String BEFORE_ACTION = "beforeAction";
+  String AFTER_ACTION = "afterAction";
+  String TIMEOUT = "timeout";
+  String REMOVE_LISTENERS = "removeListeners";
+
+  // commands
+  String CMD_SET_TRIGGER = "set-trigger";
+  String CMD_REMOVE_TRIGGER = "remove-trigger";
+  String CMD_SET_LISTENER = "set-listener";
+  String CMD_REMOVE_LISTENER = "remove-listener";
+  String CMD_SUSPEND_TRIGGER = "suspend-trigger";
+  String CMD_RESUME_TRIGGER = "resume-trigger";
+  String CMD_SET_POLICY = "set-policy";
+  String CMD_REMOVE_POLICY = "remove-policy";
+  String CMD_SET_CLUSTER_PREFERENCES = "set-cluster-preferences";
+  String CMD_SET_CLUSTER_POLICY = "set-cluster-policy";
+}


[3/3] lucene-solr:jira/solr-10745: SOLR-10745: Fix buggy element removal in triggers. Move marker processing to trigger init(). Clean old markers on Overseer start if there are no triggers to consume them.

Posted by ab...@apache.org.
SOLR-10745: Fix buggy element removal in triggers. Move marker processing to
trigger init(). Clean old markers on Overseer start if there are no triggers to
consume them.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b67de922
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b67de922
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b67de922

Branch: refs/heads/jira/solr-10745
Commit: b67de922e409d8c88b9e82655ed4a3bf0d35d4f7
Parents: c0f3f0d
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jun 1 16:02:13 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jun 1 16:02:13 2017 +0200

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ZkController.java     | 12 +--
 .../cloud/autoscaling/AutoScalingConfig.java    | 97 ++++++++++++++++++++
 .../cloud/autoscaling/NodeAddedTrigger.java     | 60 +++++++-----
 .../solr/cloud/autoscaling/NodeLostTrigger.java | 56 ++++++-----
 .../autoscaling/OverseerTriggerThread.java      | 52 ++++++++++-
 .../solr/cloud/autoscaling/TriggerBase.java     |  8 +-
 .../solr/cloud/autoscaling/TestPolicyCloud.java |  1 -
 .../autoscaling/TriggerIntegrationTest.java     | 91 +++++++++++-------
 8 files changed, 285 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a9004b8..37019ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -741,11 +741,9 @@ public class ZkController {
     LiveNodesListener listener = (oldNodes, newNodes) -> {
       oldNodes.removeAll(newNodes);
       if (oldNodes.isEmpty()) { // only added nodes
-        log.debug("-- skip, only new nodes: " + newNodes);
         return;
       }
       if (isClosed) {
-        log.debug("-- skip, closed: old=" + oldNodes + ", new=" + newNodes);
         return;
       }
       // if this node is in the top three then attempt to create nodeLost message
@@ -755,21 +753,17 @@ public class ZkController {
           break;
         }
         if (i > 2) {
-          log.debug("-- skip, " + getNodeName() + " not in the top 3 of " + newNodes);
           return; // this node is not in the top three
         }
         i++;
       }
+
       for (String n : oldNodes) {
         String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
         try {
-          // nocommit decide between EPHEMERAL vs. PERSISTENT, the latter needs
-          // explicit cleanup on cluster restart if there are no nodeLost triggers
           zkClient.create(path, null, CreateMode.PERSISTENT, true);
-          log.debug("-- created " + path);
         } catch (KeeperException.NodeExistsException e) {
           // someone else already created this node - ignore
-          log.debug("-- skip, already exists " + path);
         } catch (KeeperException | InterruptedException e1) {
           log.warn("Unable to register nodeLost path for " + n, e1);
         }
@@ -857,8 +851,8 @@ public class ZkController {
     List<Op> ops = new ArrayList<>(2);
     ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
     if (!zkClient.exists(nodeAddedPath, true)) {
-      // nocommit use EPHEMERAL or PERSISTENT?
-      // EPHEMERAL will disappear if this node shuts down, PERSISTENT will need an explicit cleanup
+      // use EPHEMERAL so that it disappears if this node goes down
+      // and no other action is taken
       ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
     }
     zkClient.multi(ops, true);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
new file mode 100644
index 0000000..2877cb9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -0,0 +1,97 @@
+package org.apache.solr.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.Utils;
+
+/**
+ * Simple bean representation of <code>autoscaling.json</code>, which parses data
+ * lazily.
+ */
+public class AutoScalingConfig {
+
+  private final Map<String, Object> jsonMap;
+
+  private Policy policy;
+  private Map<String, TriggerConfig> triggers;
+  private Map<String, ListenerConfig> listeners;
+
+  /**
+   * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.TriggerListener} config.
+   */
+  public static class ListenerConfig {
+    public String trigger;
+    public List<String> stages;
+    public String listenerClass;
+    public List<Map<String, String>> beforeActions;
+    public List<Map<String, String>> afterActions;
+
+    public ListenerConfig(Map<String, Object> properties) {
+      trigger = (String)properties.get(AutoScalingParams.TRIGGER);
+      stages = (List<String>)properties.getOrDefault(AutoScalingParams.STAGE, Collections.emptyList());
+      listenerClass = (String)properties.get(AutoScalingParams.CLASS);
+      beforeActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.BEFORE_ACTION, Collections.emptyList());
+      afterActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.AFTER_ACTION, Collections.emptyList());
+    }
+  }
+
+  /**
+   * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
+   */
+  public static class TriggerConfig {
+    public final AutoScaling.EventType eventType;
+    public final Map<String, Object> properties = new HashMap<>();
+
+    public TriggerConfig(Map<String, Object> properties) {
+      String event = (String) properties.get(AutoScalingParams.EVENT);
+      this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
+      this.properties.putAll(properties);
+    }
+  }
+
+  public AutoScalingConfig(Map<String, Object> jsonMap) {
+    this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
+  }
+
+  public Policy getPolicy() {
+    if (policy == null) {
+      policy = new Policy(jsonMap);
+    }
+    return policy;
+  }
+
+  public Map<String, TriggerConfig> getTriggerConfigs() {
+    if (triggers == null) {
+      Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
+      if (trigMap == null) {
+        triggers = Collections.emptyMap();
+      } else {
+        triggers = new HashMap<>(trigMap.size());
+        for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
+          triggers.put(entry.getKey(), new TriggerConfig((Map<String, Object>)entry.getValue()));
+        }
+      }
+    }
+    return triggers;
+  }
+
+  public Map<String, ListenerConfig> getListenerConfigs() {
+    if (listeners == null) {
+      Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
+      if (map == null) {
+        listeners = Collections.emptyMap();
+      } else {
+        listeners = new HashMap<>(map.size());
+        for (Map.Entry<String, Object> entry : map.entrySet()) {
+          listeners.put(entry.getKey(), new ListenerConfig((Map<String, Object>)entry.getValue()));
+        }
+      }
+    }
+    return listeners;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index f304ba7..7a46fc7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -62,7 +62,7 @@ public class NodeAddedTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeAdded = new ConcurrentHashMap<>();
+  private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
 
   public NodeAddedTrigger(String name, Map<String, Object> properties,
                           CoreContainer container) {
@@ -99,6 +99,20 @@ public class NodeAddedTrigger extends TriggerBase {
         actions.get(i).init(map);
       }
     }
+    // pick up added nodes for which marker paths were created
+    try {
+      List<String> added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+      added.forEach(n -> {
+        log.debug("Adding node from marker path: {}", n);
+        nodeNameVsTimeAdded.put(n, timeSource.getTime());
+        removeNodeAddedMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
+
   }
 
   @Override
@@ -225,28 +239,13 @@ public class NodeAddedTrigger extends TriggerBase {
       copyOfNew.removeAll(lastLiveNodes);
       copyOfNew.forEach(n -> {
         long eventTime = timeSource.getTime();
-        nodeNameVsTimeAdded.put(n, eventTime);
         log.debug("Tracking new node: {} at time {}", n, eventTime);
+        nodeNameVsTimeAdded.put(n, eventTime);
       });
 
-      // pick up added nodes for which marker paths were created
-      try {
-        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
-        lost.forEach(n -> {
-          log.debug("Adding node from marker path: {}", n);
-          nodeNameVsTimeAdded.put(n, timeSource.getTime());
-          try {
-            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n, -1, true);
-          } catch (KeeperException | InterruptedException e) {
-            log.debug("Exception removing nodeAdded marker " + n, e);
-          }
-        });
-      } catch (KeeperException | InterruptedException e) {
-        log.warn("Exception retrieving nodeLost markers", e);
-      }
-
       // has enough time expired to trigger events for a node?
-      for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeAdded.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
         String nodeName = entry.getKey();
         Long timeAdded = entry.getValue();
         long now = timeSource.getTime();
@@ -257,20 +256,35 @@ public class NodeAddedTrigger extends TriggerBase {
             log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
             if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
               // remove from tracking set only if the fire was accepted
-              trackingKeySet.remove(nodeName);
+              it.remove();
+              removeNodeAddedMarker(nodeName);
             }
           } else  {
-            trackingKeySet.remove(nodeName);
+            it.remove();
+            removeNodeAddedMarker(nodeName);
           }
         }
       }
-
       lastLiveNodes = new HashSet(newLiveNodes);
     } catch (RuntimeException e) {
       log.error("Unexpected exception in NodeAddedTrigger", e);
     }
   }
 
+  private void removeNodeAddedMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
+    try {
+      if (container.getZkController().getZkClient().exists(path, true)) {
+        container.getZkController().getZkClient().delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.debug("Exception removing nodeAdded marker " + nodeName, e);
+    }
+
+  }
+
   @Override
   public boolean isClosed() {
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 6450bda..2af4cc5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -62,7 +62,7 @@ public class NodeLostTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+  private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
 
   public NodeLostTrigger(String name, Map<String, Object> properties,
                          CoreContainer container) {
@@ -98,6 +98,19 @@ public class NodeLostTrigger extends TriggerBase {
         actions.get(i).init(map);
       }
     }
+    // pick up lost nodes for which marker paths were created
+    try {
+      List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+      lost.forEach(n -> {
+        log.debug("Adding lost node from marker path: {}", n);
+        nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+        removeNodeLostMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
   }
 
   @Override
@@ -227,24 +240,9 @@ public class NodeLostTrigger extends TriggerBase {
         nodeNameVsTimeRemoved.put(n, timeSource.getTime());
       });
 
-      // pick up lost nodes for which marker paths were created
-      try {
-        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
-        lost.forEach(n -> {
-          log.debug("Adding lost node from marker path: {}", n);
-          nodeNameVsTimeRemoved.put(n, timeSource.getTime());
-          try {
-            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n, -1, true);
-          } catch (KeeperException | InterruptedException e) {
-            log.warn("Exception removing nodeLost marker " + n, e);
-          }
-        });
-      } catch (KeeperException | InterruptedException e) {
-        log.warn("Exception retrieving nodeLost markers", e);
-      }
-
       // has enough time expired to trigger events for a node?
-      for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeRemoved.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
         String nodeName = entry.getKey();
         Long timeRemoved = entry.getValue();
         if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
@@ -253,20 +251,34 @@ public class NodeLostTrigger extends TriggerBase {
           if (listener != null) {
             log.debug("NodeLostTrigger firing registered listener");
             if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName)))  {
-              trackingKeySet.remove(nodeName);
+              it.remove();
+              removeNodeLostMarker(nodeName);
             }
           } else  {
-            trackingKeySet.remove(nodeName);
+            it.remove();
+            removeNodeLostMarker(nodeName);
           }
         }
       }
-
       lastLiveNodes = new HashSet<>(newLiveNodes);
     } catch (RuntimeException e) {
       log.error("Unexpected exception in NodeLostTrigger", e);
     }
   }
 
+  private void removeNodeLostMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
+    try {
+      if (container.getZkController().getZkClient().exists(path, true)) {
+        container.getZkController().getZkClient().delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception removing nodeLost marker " + nodeName, e);
+    }
+  }
+
   @Override
   public boolean isClosed() {
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 4a89ce7..91146b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -169,10 +170,59 @@ public class OverseerTriggerThread implements Runnable, Closeable {
           scheduledTriggers.remove(managedTriggerName);
         }
       }
+      // check for nodeLost triggers in the current config, and if
+      // absent then clean up old nodeLost / nodeAdded markers
+      boolean cleanOldNodeLostMarkers = true;
+      boolean cleanOldNodeAddedMarkers = true;
       // add new triggers and/or replace and close the replaced triggers
       for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
+        if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODELOST)) {
+          cleanOldNodeLostMarkers = false;
+        }
+        if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODEADDED)) {
+          cleanOldNodeAddedMarkers = false;
+        }
         scheduledTriggers.add(entry.getValue());
       }
+      if (cleanOldNodeLostMarkers) {
+        log.debug("-- clean old nodeLost markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeLost markers", e);
+        }
+      }
+      if (cleanOldNodeAddedMarkers) {
+        log.debug("-- clean old nodeAdded markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeAdded markers", e);
+        }
+
+      }
+    }
+  }
+
+  private void removeNodeMarker(String path, String nodeName) {
+    path = path + "/" + nodeName;
+    try {
+      zkClient.delete(path, -1, true);
+      log.debug("  -- deleted " + path);
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Error removing old marker " + path, e);
     }
   }
 
@@ -250,7 +300,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
 
     for (Map.Entry<String, Object> entry : triggers.entrySet()) {
       Map<String, Object> props = (Map<String, Object>) entry.getValue();
-      String event = (String) props.get("event");
+      String event = (String) props.get(AutoScalingParams.EVENT);
       AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
       String triggerName = entry.getKey();
       triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
index ef9a3cf..7aff846 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -97,11 +97,11 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
       LOG.warn("Exception getting trigger state '" + path + "'", e);
     }
     if (data != null) {
-      Map<String, Object> state = (Map<String, Object>)Utils.fromJSON(data);
+      Map<String, Object> restoredState = (Map<String, Object>)Utils.fromJSON(data);
       // make sure lastState is sorted
-      state = Utils.getDeepCopy(state, 10, false, true);;
-      setState(state);
-      lastState = state;
+      restoredState = Utils.getDeepCopy(restoredState, 10, false, true);
+      setState(restoredState);
+      lastState = restoredState;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
index 5038278..a809873 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
@@ -30,7 +30,6 @@ import org.apache.solr.cloud.OverseerTaskProcessor;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index b2c95b7..ae3f72d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -139,6 +139,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     paths.forEach(n -> {
       try {
         ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
+      } catch (KeeperException.NoNodeException e) {
+        // ignore
       } catch (KeeperException | InterruptedException e) {
         log.warn("Error deleting old data", e);
       }
@@ -634,10 +636,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public void process(TriggerEvent event) {
+      log.info("-- event: " + event);
       events.add(event);
       getActionStarted().countDown();
       try {
-        Thread.sleep(5000);
+        Thread.sleep(eventQueueActionWait);
         triggerFired.compareAndSet(false, true);
         getActionCompleted().countDown();
       } catch (InterruptedException e) {
@@ -658,6 +661,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     }
   }
 
+  public static long eventQueueActionWait = 5000;
+
   @Test
   public void testEventQueue() throws Exception {
     CloudSolrClient solrClient = cluster.getSolrClient();
@@ -696,19 +701,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     assertNotNull(nodeAddedEvent);
     // but action did not complete yet so the event is still enqueued
     assertFalse(triggerFired.get());
+    events.clear();
     actionStarted = new CountDownLatch(1);
+    eventQueueActionWait = 1;
     // kill overseer leader
     cluster.stopJettySolrRunner(overseerLeaderIndex);
     Thread.sleep(5000);
+    // new overseer leader should be elected and run triggers
     await = actionInterrupted.await(3, TimeUnit.SECONDS);
     assertTrue("action wasn't interrupted", await);
-    // new overseer leader should be elected and run triggers
-    newNode = cluster.startJettySolrRunner();
-    // it should fire again but not complete yet
+    // it should fire again from enqueued event
     await = actionStarted.await(60, TimeUnit.SECONDS);
     TriggerEvent replayedEvent = events.iterator().next();
     assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
-    assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
+    assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
     await = actionCompleted.await(10, TimeUnit.SECONDS);
     assertTrue(triggerFired.get());
   }
@@ -743,6 +749,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
       }
     }
 
+    events.clear();
+
     JettySolrRunner newNode = cluster.startJettySolrRunner();
     boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
@@ -797,8 +805,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
   }
 
   public static class TestEventMarkerAction implements TriggerAction {
-    // sanity check that an action instance is only invoked once
-    private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
 
     public TestEventMarkerAction() {
       actionConstructorCalled.countDown();
@@ -806,7 +812,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public String getName() {
-      return "TestTriggerAction";
+      return "TestEventMarkerAction";
     }
 
     @Override
@@ -841,17 +847,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public void init(Map<String, String> args) {
-      log.info("TestTriggerAction init");
+      log.info("TestEventMarkerAction init");
       actionInitCalled.countDown();
     }
   }
 
   @Test
-  public void testNodeEventsRegistration() throws Exception {
+  public void testNodeMarkersRegistration() throws Exception {
     // for this test we want to create two triggers so we must assert that the actions were created twice
     actionInitCalled = new CountDownLatch(2);
     // similarly we want both triggers to fire
-    triggerFiredLatch = new CountDownLatch(3);
+    triggerFiredLatch = new CountDownLatch(2);
     TestLiveNodesListener listener = registerLiveNodesListener();
 
     NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
@@ -875,7 +881,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
     assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
     listener.reset();
-    // stop overseer, which should also cause nodeLost event
+    // stop overseer
+    log.info("====== KILL OVERSEER 1");
     cluster.stopJettySolrRunner(overseerLeaderIndex);
     if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
       fail("onChange listener didn't execute on cluster change");
@@ -883,12 +890,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     assertEquals(1, listener.lostNodes.size());
     assertEquals(overseerLeader, listener.lostNodes.iterator().next());
     assertEquals(0, listener.addedNodes.size());
-    // verify that a znode exists
+    // wait until the new overseer is up
+    Thread.sleep(5000);
+    // verify that a znode does NOT exist - there's no nodeLost trigger,
+    // so the new overseer cleaned up existing nodeLost markers
     String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
-    assertTrue("Path " + pathLost + " wasn't created", zkClient().exists(pathLost, true));
+    assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
 
     listener.reset();
     // create another node
+    log.info("====== ADD NODE 1");
     JettySolrRunner node1 = cluster.startJettySolrRunner();
     if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
       fail("onChange listener didn't execute on cluster change");
@@ -902,6 +913,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     // set up triggers
     CloudSolrClient solrClient = cluster.getSolrClient();
 
+    log.info("====== ADD TRIGGERS");
     String setTriggerCommand = "{" +
         "'set-trigger' : {" +
         "'name' : 'node_added_trigger'," +
@@ -926,25 +938,40 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
-    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
-      fail("Both triggers should have fired by now");
-    }
-    assertEquals(3, events.size());
-    // 2 nodeAdded, 1 nodeLost
-    int nodeAdded = 0;
-    int nodeLost = 0;
-    for (TriggerEvent ev : events) {
-      String nodeName = (String)ev.getProperty(TriggerEvent.NODE_NAME);
-      if (ev.eventType.equals(AutoScaling.EventType.NODELOST)) {
-        assertEquals(overseerLeader, nodeName);
-        nodeLost++;
-      } else if (ev.eventType.equals(AutoScaling.EventType.NODEADDED)) {
-        assertTrue(nodeName + " not one of: " + node.getNodeName() + ", " + node1.getNodeName(),
-            nodeName.equals(node.getNodeName()) || nodeName.equals(node1.getNodeName()));
-        nodeAdded++;
+    overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+    overseerLeader = (String) overSeerStatus.get("leader");
+    overseerLeaderIndex = 0;
+    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+      if (jetty.getNodeName().equals(overseerLeader)) {
+        overseerLeaderIndex = i;
+        break;
       }
     }
-    assertEquals(1, nodeLost);
-    assertEquals(2, nodeAdded);
+
+    Thread.sleep(5000);
+    // old nodeAdded markers should be consumed now by nodeAdded trigger
+    // but it doesn't result in new events because all nodes have been added
+    // before we configured the trigger
+    assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+
+    listener.reset();
+    events.clear();
+    triggerFiredLatch = new CountDownLatch(1);
+    // kill overseer again
+    log.info("====== KILL OVERSEER 2");
+    cluster.stopJettySolrRunner(overseerLeaderIndex);
+    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+
+
+    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+      fail("Trigger should have fired by now");
+    }
+    assertEquals(1, events.size());
+    TriggerEvent ev = events.iterator().next();
+    assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
+    assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
   }
 }


[2/3] lucene-solr:jira/solr-10745: Merge branch 'feature/autoscaling' into jira/solr-10745

Posted by ab...@apache.org.
Merge branch 'feature/autoscaling' into jira/solr-10745


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c0f3f0dc
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c0f3f0dc
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c0f3f0dc

Branch: refs/heads/jira/solr-10745
Commit: c0f3f0dc921ec5e8310d799a0bae073d8da2f848
Parents: 3e05eef ee2be20
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed May 31 12:09:11 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed May 31 12:09:11 2017 +0200

----------------------------------------------------------------------
 .../cloud/autoscaling/AutoScalingHandler.java   | 148 +++++++++++--------
 .../autoscaling/AutoScalingHandlerTest.java     |  41 ++++-
 .../solr/common/params/AutoScalingParams.java   |  53 +++++++
 3 files changed, 179 insertions(+), 63 deletions(-)
----------------------------------------------------------------------