You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/07/04 09:12:11 UTC

lucene-solr:feature/autoscaling: SOLR-10965: New ExecutePlanAction for autoscaling which executes the operations computed by ComputePlanAction against the cluster

Repository: lucene-solr
Updated Branches:
  refs/heads/feature/autoscaling 2e19a94fc -> 418904607


SOLR-10965: New ExecutePlanAction for autoscaling which executes the operations computed by ComputePlanAction against the cluster


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/41890460
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/41890460
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/41890460

Branch: refs/heads/feature/autoscaling
Commit: 41890460789e11f687ce49d337d200e31c3be83f
Parents: 2e19a94
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Tue Jul 4 14:42:03 2017 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Tue Jul 4 14:42:03 2017 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../cloud/autoscaling/ExecutePlanAction.java    |  58 +++++-
 .../autoscaling/ExecutePlanActionTest.java      | 183 +++++++++++++++++++
 3 files changed, 239 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/41890460/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 700f153..f367a86 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -229,6 +229,9 @@ New Features
 * SOLR-10496: New ComputePlanAction for autoscaling which uses the policy framework to compute cluster
   operations upon a trigger fire. (Noble Paul, shalin)
 
+* SOLR-10965: New ExecutePlanAction for autoscaling which executes the operations computed by ComputePlanAction
+  against the cluster. (shalin)
+
 Bug Fixes
 ----------------------
 * SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/41890460/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index d6c288b..86e24f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -18,12 +18,31 @@
 package org.apache.solr.cloud.autoscaling;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * todo nocommit
+ * This class is responsible for executing cluster operations read from the {@link ActionContext}'s properties
+ * with the key name "operations"
  */
 public class ExecutePlanAction implements TriggerAction {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private Map<String, String> initArgs;
+
   @Override
   public void close() throws IOException {
 
@@ -31,16 +50,45 @@ public class ExecutePlanAction implements TriggerAction {
 
   @Override
   public void init(Map<String, String> args) {
-
+    this.initArgs = args;
   }
 
   @Override
   public String getName() {
-    return null;
+    return initArgs.get("name");
   }
 
   @Override
-  public void process(TriggerEvent event, ActionContext actionContext) {
-
+  public void process(TriggerEvent event, ActionContext context) {
+    log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
+    CoreContainer container = context.getCoreContainer();
+    List<SolrRequest> operations = (List<SolrRequest>) context.getProperty("operations");
+    if (operations == null || operations.isEmpty()) {
+      log.info("No operations to execute for event: {}", event);
+      return;
+    }
+    try (CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder()
+        .withZkHost(container.getZkController().getZkServerAddress())
+        .withHttpClient(container.getUpdateShardHandler().getHttpClient())
+        .build()) {
+      for (SolrRequest operation : operations) {
+        log.info("Executing operation: {}", operation.getParams());
+        try {
+          SolrResponse response = operation.process(cloudSolrClient);
+          context.getProperties().compute("responses", (s, o) -> {
+            List<NamedList<Object>> responses = (List<NamedList<Object>>) o;
+            if (responses == null)  responses = new ArrayList<>(operations.size());
+            responses.add(response.getResponse());
+            return responses;
+          });
+        } catch (SolrServerException | HttpSolrClient.RemoteSolrException e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+              "Unexpected exception executing operation: " + operation.getParams(), e);
+        }
+      }
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Unexpected IOException while processing event: " + event, e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/41890460/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
new file mode 100644
index 0000000..6f088ea
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+/**
+ * Test for {@link ExecutePlanAction}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
+public class ExecutePlanActionTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int NODE_COUNT = 2;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(NODE_COUNT)
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+  }
+
+  @Before
+  public void setUp() throws Exception  {
+    super.setUp();
+    // clear any persisted auto scaling configuration
+    Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
+
+    if (cluster.getJettySolrRunners().size() < NODE_COUNT) {
+      // start some to get to original state
+      int numJetties = cluster.getJettySolrRunners().size();
+      for (int i = 0; i < NODE_COUNT - numJetties; i++) {
+        cluster.startJettySolrRunner();
+      }
+    }
+    cluster.waitForAllNodes(30);
+  }
+
+  @Test
+  public void testExecute() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String collectionName = "testExecute";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+        "conf", 1, 2);
+    create.setMaxShardsPerNode(1);
+    create.process(solrClient);
+
+    waitForState("Timed out waiting for replicas of new collection to be active",
+        collectionName, clusterShape(1, 2));
+
+    JettySolrRunner sourceNode = cluster.getRandomJetty(random());
+    String sourceNodeName = sourceNode.getNodeName();
+    ClusterState clusterState = solrClient.getZkStateReader().getClusterState();
+    DocCollection docCollection = clusterState.getCollection(collectionName);
+    List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
+    assertNotNull(replicas);
+    assertFalse(replicas.isEmpty());
+
+    List<JettySolrRunner> otherJetties = cluster.getJettySolrRunners().stream()
+        .filter(jettySolrRunner -> jettySolrRunner != sourceNode).collect(Collectors.toList());
+    assertFalse(otherJetties.isEmpty());
+    JettySolrRunner survivor = otherJetties.get(0);
+
+    try (ExecutePlanAction action = new ExecutePlanAction()) {
+      action.init(Collections.singletonMap("name", "execute_plan"));
+      CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(collectionName, replicas.get(0).getName(), survivor.getNodeName());
+      List<SolrRequest> operations = Collections.singletonList(moveReplica);
+      NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent(AutoScaling.EventType.NODELOST,
+          "mock_trigger_name", TimeSource.CURRENT_TIME.getTime(), sourceNodeName);
+      ActionContext actionContext = new ActionContext(survivor.getCoreContainer(), null,
+          new HashMap<>(Collections.singletonMap("operations", operations)));
+      action.process(nodeLostEvent, actionContext);
+
+      List<NamedList<Object>> responses = (List<NamedList<Object>>) actionContext.getProperty("responses");
+      assertNotNull(responses);
+      assertEquals(1, responses.size());
+      NamedList<Object> response = responses.get(0);
+      assertNull(response.get("failure"));
+      assertNotNull(response.get("success"));
+    }
+
+    waitForState("Timed out waiting for replicas of new collection to be active",
+        collectionName, clusterShape(1, 2));
+  }
+
+  @Test
+  public void testIntegration() throws Exception  {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '1s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+        "{'name':'execute_plan','class':'solr.ExecutePlanAction'}]" +
+        "}}";
+    SolrRequest req = AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String collectionName = "testIntegration";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+        "conf", 1, 2);
+    create.setMaxShardsPerNode(1);
+    create.process(solrClient);
+
+    waitForState("Timed out waiting for replicas of new collection to be active",
+        collectionName, clusterShape(1, 2));
+
+    JettySolrRunner sourceNode = cluster.getRandomJetty(random());
+    String sourceNodeName = sourceNode.getNodeName();
+    ClusterState clusterState = solrClient.getZkStateReader().getClusterState();
+    DocCollection docCollection = clusterState.getCollection(collectionName);
+    List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
+    assertNotNull(replicas);
+    assertFalse(replicas.isEmpty());
+
+    List<JettySolrRunner> otherJetties = cluster.getJettySolrRunners().stream()
+        .filter(jettySolrRunner -> jettySolrRunner != sourceNode).collect(Collectors.toList());
+    assertFalse(otherJetties.isEmpty());
+    JettySolrRunner survivor = otherJetties.get(0);
+
+    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+      JettySolrRunner runner = cluster.getJettySolrRunner(i);
+      if (runner == sourceNode) {
+        cluster.stopJettySolrRunner(i);
+      }
+    }
+
+    cluster.waitForAllNodes(30);
+    waitForState("Timed out waiting for replicas of collection to be 2 again",
+        collectionName, clusterShape(1, 2));
+
+    clusterState = solrClient.getZkStateReader().getClusterState();
+    docCollection = clusterState.getCollection(collectionName);
+    List<Replica> replicasOnSurvivor = docCollection.getReplicas(survivor.getNodeName());
+    assertNotNull(replicasOnSurvivor);
+    assertEquals(2, replicasOnSurvivor.size());
+  }
+}