You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 11:56:49 UTC
[2/9] lucene-solr:master: SOLR-11285: Simulation framework for
autoscaling.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
new file mode 100644
index 0000000..a05eb78
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
+import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
+import org.apache.solr.cloud.autoscaling.TriggerActionBase;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
+import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
+import org.apache.solr.cloud.autoscaling.CapturedEvent;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
+
+/**
+ * An end-to-end integration test for triggers
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
+public class TestTriggerIntegration extends SimSolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final int SPEED = 50;
+
+ private static CountDownLatch actionConstructorCalled;
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static int waitForSeconds = 1;
+ private static CountDownLatch actionStarted;
+ private static CountDownLatch actionInterrupted;
+ private static CountDownLatch actionCompleted;
+ private static AtomicBoolean triggerFired;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+
+ private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2, TimeSource.get("simTime:" + SPEED));
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ private static CountDownLatch getActionStarted() {
+ return actionStarted;
+ }
+
+ private static CountDownLatch getActionInterrupted() {
+ return actionInterrupted;
+ }
+
+ private static CountDownLatch getActionCompleted() {
+ return actionCompleted;
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+
+ waitForSeconds = 1 + random().nextInt(3);
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired = new AtomicBoolean(false);
+ actionStarted = new CountDownLatch(1);
+ actionInterrupted = new CountDownLatch(1);
+ actionCompleted = new CountDownLatch(1);
+ events.clear();
+ listenerEvents.clear();
+ while (cluster.getClusterStateProvider().getLiveNodes().size() < 2) {
+ // perhaps a test stopped a node but didn't start it back
+ // lets start a node
+ cluster.simAddNode();
+ }
+ }
+
+ @Test
+ public void testTriggerThrottling() throws Exception {
+ // for this test we want to create two triggers so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+ // similarly we want both triggers to fire
+ triggerFiredLatch = new CountDownLatch(2);
+
+ SolrClient solrClient = cluster.simGetSolrClient();
+
+ // first trigger
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger1'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // second trigger
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger2'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the two instances of action are created
+ if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ String newNode = cluster.simAddNode();
+
+ if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("Both triggers should have fired by now");
+ }
+
+ // reset shared state
+ lastActionExecutedAt.set(0);
+ TestTriggerIntegration.actionInitCalled = new CountDownLatch(2);
+ triggerFiredLatch = new CountDownLatch(2);
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger1'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger2'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the two instances of action are created
+ if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ // stop the node we had started earlier
+ cluster.simRemoveNode(newNode, false);
+
+ if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("Both triggers should have fired by now");
+ }
+ }
+
+ static AtomicLong lastActionExecutedAt = new AtomicLong(0);
+ static ReentrantLock lock = new ReentrantLock();
+ public static class ThrottlingTesterAction extends TestTriggerAction {
+ // nanos are very precise so we need a delta for comparison with ms
+ private static final long DELTA_MS = 2;
+
+ // sanity check that an action instance is only invoked once
+ private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ boolean locked = lock.tryLock();
+ if (!locked) {
+ log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+ return;
+ }
+ try {
+ if (lastActionExecutedAt.get() != 0) {
+ log.info("last action at " + lastActionExecutedAt.get() + " time = " + cluster.getTimeSource().getTime());
+ if (TimeUnit.NANOSECONDS.toMillis(cluster.getTimeSource().getTime() - lastActionExecutedAt.get()) <
+ TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS) - DELTA_MS) {
+ log.info("action executed again before minimum wait time from {}", event.getSource());
+ fail("TriggerListener was fired before the throttling period");
+ }
+ }
+ if (onlyOnce.compareAndSet(false, true)) {
+ log.info("action executed from {}", event.getSource());
+ lastActionExecutedAt.set(cluster.getTimeSource().getTime());
+ getTriggerFiredLatch().countDown();
+ } else {
+ log.info("action executed more than once from {}", event.getSource());
+ fail("Trigger should not have fired more than once!");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Test
+ public void testNodeLostTriggerRestoreState() throws Exception {
+ // for this test we want to update the trigger so we must assert that the actions were created twice
+ TestTriggerIntegration.actionInitCalled = new CountDownLatch(2);
+
+ // start a new node
+ String nodeName = cluster.simAddNode();
+
+ SolrClient solrClient = cluster.simGetSolrClient();
+ waitForSeconds = 5;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_restore_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '5s'," + // should be enough for us to update the trigger
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cluster.getTimeSource());
+ while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+ timeOut.sleep(200);
+ }
+ assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+ cluster.simRemoveNode(nodeName, false);
+
+ // ensure that the old trigger sees the stopped node, todo find a better way to do this
+ timeOut.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+ waitForSeconds = 0;
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_restore_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the second instance of action is created
+ if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ boolean await = triggerFiredLatch.await(5000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(nodeName));
+ }
+
+ @Test
+ public void testNodeAddedTriggerRestoreState() throws Exception {
+ // for this test we want to update the trigger so we must assert that the actions were created twice
+ TestTriggerIntegration.actionInitCalled = new CountDownLatch(2);
+
+ SolrClient solrClient = cluster.simGetSolrClient();
+ waitForSeconds = 5;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_restore_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '5s'," + // should be enough for us to update the trigger
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cluster.getTimeSource());
+ while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+ timeOut.sleep(200);
+ }
+ assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+ // start a new node
+ String newNode = cluster.simAddNode();
+
+ // ensure that the old trigger sees the new node, todo find a better way to do this
+ cluster.getTimeSource().sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+ waitForSeconds = 0;
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_restore_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the second instance of action is created
+ if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ boolean await = triggerFiredLatch.await(5000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ TriggerEvent nodeAddedEvent = events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.toString(), nodeNames.contains(newNode));
+ }
+
+ @Test
+ public void testNodeAddedTrigger() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ String newNode = cluster.simAddNode();
+ boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ TriggerEvent nodeAddedEvent = events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeAddedEvent.toString(), nodeNames.contains(newNode));
+
+ // reset
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+
+ // update the trigger with exactly the same data
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // this should be a no-op so the action should have been created but init should not be called
+ if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ assertFalse(actionInitCalled.await(2000 / SPEED, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testNodeLostTrigger() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(5000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ String lostNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ cluster.simRemoveNode(lostNodeName, false);
+ boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ TriggerEvent nodeLostEvent = events.iterator().next();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(lostNodeName));
+
+ // reset
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+
+ // update the trigger with exactly the same data
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // this should be a no-op so the action should have been created but init should not be called
+ if (!actionConstructorCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ assertFalse(actionInitCalled.await(2000 / SPEED, TimeUnit.MILLISECONDS));
+ }
+
+ // simulator doesn't support overseer functionality yet
+ /*
+ @Test
+ public void testContinueTriggersOnOverseerRestart() throws Exception {
+ CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ CollectionAdminResponse adminResponse = status.process(solrClient);
+ NamedList<Object> response = adminResponse.getResponse();
+ String leader = (String) response.get("leader");
+ JettySolrRunner overseerNode = null;
+ int index = -1;
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ for (int i = 0; i < jettySolrRunners.size(); i++) {
+ JettySolrRunner runner = jettySolrRunners.get(i);
+ if (runner.getNodeName().equals(leader)) {
+ overseerNode = runner;
+ index = i;
+ break;
+ }
+ }
+ assertNotNull(overseerNode);
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ // stop the overseer, somebody else will take over as the overseer
+ cluster.stopJettySolrRunner(index);
+ Thread.sleep(10000);
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode.getNodeName()));
+ }
+
+*/
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = cluster.getTimeSource().getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.getSource() + " was fired before the configured waitFor period");
+ }
+ getTriggerFiredLatch().countDown();
+ } else {
+ fail(event.getSource() + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+
+ public static class TestEventQueueAction extends TriggerActionBase {
+
+ public TestEventQueueAction() {
+ log.info("TestEventQueueAction instantiated");
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ log.info("-- event: " + event);
+ events.add(event);
+ getActionStarted().countDown();
+ try {
+ Thread.sleep(eventQueueActionWait);
+ triggerFired.compareAndSet(false, true);
+ getActionCompleted().countDown();
+ } catch (InterruptedException e) {
+ getActionInterrupted().countDown();
+ return;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.debug("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+
+ public static long eventQueueActionWait = 5000;
+
+ @Test
+ public void testEventQueue() throws Exception {
+ waitForSeconds = 1;
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger1'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
+ "}}";
+
+ String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ // add node to generate the event
+ String newNode = cluster.simAddNode();
+ boolean await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("action did not start", await);
+ // event should be there
+ TriggerEvent nodeAddedEvent = events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ // but action did not complete yet so the event is still enqueued
+ assertFalse(triggerFired.get());
+ events.clear();
+ actionStarted = new CountDownLatch(1);
+ eventQueueActionWait = 1;
+ // kill overseer
+ cluster.simRestartOverseer(overseerLeader);
+ cluster.getTimeSource().sleep(5000);
+ // new overseer leader should be elected and run triggers
+ await = actionInterrupted.await(3000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("action wasn't interrupted", await);
+ // it should fire again from enqueued event
+ await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("action wasn't started", await);
+ TriggerEvent replayedEvent = events.iterator().next();
+ assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
+ assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
+ await = actionCompleted.await(10000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("action wasn't completed", await);
+ assertTrue(triggerFired.get());
+ }
+
+ @Test
+ public void testEventFromRestoredState() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '10s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ events.clear();
+
+ String newNode = cluster.simAddNode();
+ boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ // reset
+ triggerFired.set(false);
+ triggerFiredLatch = new CountDownLatch(1);
+ TriggerEvent nodeAddedEvent = events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode));
+ // add a second node - state of the trigger will change but it won't fire for waitFor sec.
+ String newNode2 = cluster.simAddNode();
+ cluster.getTimeSource().sleep(10000);
+ // kill overseer
+ cluster.simRestartOverseer(null);
+ await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ }
+
+ private static class TestLiveNodesListener implements LiveNodesListener {
+ Set<String> lostNodes = new HashSet<>();
+ Set<String> addedNodes = new HashSet<>();
+ CountDownLatch onChangeLatch = new CountDownLatch(1);
+
+ public void reset() {
+ lostNodes.clear();
+ addedNodes.clear();
+ onChangeLatch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+ onChangeLatch.countDown();
+ Set<String> old = new HashSet<>(oldLiveNodes);
+ old.removeAll(newLiveNodes);
+ if (!old.isEmpty()) {
+ lostNodes.addAll(old);
+ }
+ newLiveNodes.removeAll(oldLiveNodes);
+ if (!newLiveNodes.isEmpty()) {
+ addedNodes.addAll(newLiveNodes);
+ }
+ }
+ }
+
+ private TestLiveNodesListener registerLiveNodesListener() {
+ TestLiveNodesListener listener = new TestLiveNodesListener();
+ cluster.getLiveNodesSet().registerLiveNodesListener(listener);
+ return listener;
+ }
+
+ public static class TestEventMarkerAction extends TriggerActionBase {
+
+ public TestEventMarkerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ boolean locked = lock.tryLock();
+ if (!locked) {
+ log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+ return;
+ }
+ try {
+ events.add(event);
+ getTriggerFiredLatch().countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestEventMarkerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+
+ @Test
+ public void testNodeMarkersRegistration() throws Exception {
+ // for this test we want to create two triggers so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+ // similarly we want both triggers to fire
+ triggerFiredLatch = new CountDownLatch(2);
+ TestLiveNodesListener listener = registerLiveNodesListener();
+
+ SolrClient solrClient = cluster.simGetSolrClient();
+
+ // pick overseer node
+ String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
+ // add a node
+ String node = cluster.simAddNode();
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node, listener.addedNodes.iterator().next());
+ // verify that a znode doesn't exist (no trigger)
+ String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
+ assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers",
+ cluster.getDistribStateManager().hasData(pathAdded));
+ listener.reset();
+ // stop overseer
+ log.info("====== KILL OVERSEER 1");
+ cluster.simRestartOverseer(overseerLeader);
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.lostNodes.size());
+ assertEquals(overseerLeader, listener.lostNodes.iterator().next());
+ assertEquals(0, listener.addedNodes.size());
+ // wait until the new overseer is up
+ cluster.getTimeSource().sleep(5000);
+ // verify that a znode does NOT exist - there's no nodeLost trigger,
+ // so the new overseer cleaned up existing nodeLost markers
+ String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
+ assertFalse("Path " + pathLost + " exists", cluster.getDistribStateManager().hasData(pathLost));
+
+ listener.reset();
+
+ // set up triggers
+
+ log.info("====== ADD TRIGGERS");
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
+ // create another node
+ log.info("====== ADD NODE 1");
+ String node1 = cluster.simAddNode();
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node1, listener.addedNodes.iterator().next());
+ // verify that a znode exists
+ pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1;
+ assertTrue("Path " + pathAdded + " wasn't created", cluster.getDistribStateManager().hasData(pathAdded));
+
+ cluster.getTimeSource().sleep(5000);
+ // nodeAdded marker should be consumed now by nodeAdded trigger
+ assertFalse("Path " + pathAdded + " should have been deleted",
+ cluster.getDistribStateManager().hasData(pathAdded));
+
+ listener.reset();
+ events.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ // kill overseer again
+ log.info("====== KILL OVERSEER 2");
+ cluster.simRestartOverseer(overseerLeader);
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+
+
+ if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("Trigger should have fired by now");
+ }
+ assertEquals(1, events.size());
+ TriggerEvent ev = events.iterator().next();
+ List<String> nodeNames = (List<String>)ev.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(overseerLeader));
+ assertEquals(TriggerEventType.NODELOST, ev.getEventType());
+ }
+
+ static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
+ static CountDownLatch listenerCreated = new CountDownLatch(1);
+ static boolean failDummyAction = false;
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(cluster.getTimeSource().getTime(), context, config, stage, actionName, event, message));
+ }
+ }
+
+ public static class TestDummyAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) {
+ if (failDummyAction) {
+ throw new RuntimeException("failure");
+ }
+
+ }
+ }
+
+ @Test
+ public void testListeners() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}," +
+ "{'name':'test1','class':'" + TestDummyAction.class.getName() + "'}," +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'foo'," +
+ "'trigger' : 'node_added_trigger'," +
+ "'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
+ "'beforeAction' : 'test'," +
+ "'afterAction' : ['test', 'test1']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'bar'," +
+ "'trigger' : 'node_added_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'beforeAction' : ['test', 'test1']," +
+ "'afterAction' : 'test'," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ listenerEvents.clear();
+ failDummyAction = false;
+
+ String newNode = cluster.simAddNode();
+ boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+
+ assertEquals("both listeners should have fired", 2, listenerEvents.size());
+
+ cluster.getTimeSource().sleep(2000);
+
+ // check foo events
+ List<CapturedEvent> testEvents = listenerEvents.get("foo");
+ assertNotNull("foo events: " + testEvents, testEvents);
+ assertEquals("foo events: " + testEvents, 5, testEvents.size());
+
+ assertEquals(TriggerEventProcessorStage.STARTED, testEvents.get(0).stage);
+
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+ assertEquals("test", testEvents.get(2).actionName);
+
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(3).stage);
+ assertEquals("test1", testEvents.get(3).actionName);
+
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, testEvents.get(4).stage);
+
+ // check bar events
+ testEvents = listenerEvents.get("bar");
+ assertNotNull("bar events", testEvents);
+ assertEquals("bar events", 4, testEvents.size());
+
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+ assertEquals("test", testEvents.get(0).actionName);
+
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+ assertEquals("test1", testEvents.get(2).actionName);
+
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, testEvents.get(3).stage);
+
+ // reset
+ triggerFired.set(false);
+ triggerFiredLatch = new CountDownLatch(1);
+ listenerEvents.clear();
+ failDummyAction = true;
+
+ newNode = cluster.simAddNode();
+ await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+
+ cluster.getTimeSource().sleep(2000);
+
+ // check foo events
+ testEvents = listenerEvents.get("foo");
+ assertNotNull("foo events: " + testEvents, testEvents);
+ assertEquals("foo events: " + testEvents, 4, testEvents.size());
+
+ assertEquals(TriggerEventProcessorStage.STARTED, testEvents.get(0).stage);
+
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+ assertEquals("test", testEvents.get(2).actionName);
+
+ assertEquals(TriggerEventProcessorStage.FAILED, testEvents.get(3).stage);
+ assertEquals("test1", testEvents.get(3).actionName);
+
+ // check bar events
+ testEvents = listenerEvents.get("bar");
+ assertNotNull("bar events", testEvents);
+ assertEquals("bar events", 4, testEvents.size());
+
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+ assertEquals("test", testEvents.get(0).actionName);
+
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+ assertEquals("test1", testEvents.get(2).actionName);
+
+ assertEquals(TriggerEventProcessorStage.FAILED, testEvents.get(3).stage);
+ assertEquals("test1", testEvents.get(3).actionName);
+ }
+
+ @Test
+ public void testCooldown() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ failDummyAction = false;
+ waitForSeconds = 1;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_cooldown_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'bar'," +
+ "'trigger' : 'node_added_cooldown_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ listenerCreated = new CountDownLatch(1);
+ listenerEvents.clear();
+
+ String newNode = cluster.simAddNode();
+ boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ // wait for listener to capture the SUCCEEDED stage
+ cluster.getTimeSource().sleep(1000);
+
+ List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
+ // we may get a few IGNORED events if other tests caused events within cooldown period
+ assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
+ long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;
+
+ // reset the trigger and captured events
+ listenerEvents.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired.compareAndSet(true, false);
+
+ String newNode2 = cluster.simAddNode();
+ await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ cluster.getTimeSource().sleep(2000);
+
+ // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
+ capturedEvents = listenerEvents.get("bar");
+ assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
+ for (int i = 0; i < capturedEvents.size() - 1; i++) {
+ CapturedEvent ev = capturedEvents.get(i);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
+ assertTrue(ev.toString(), ev.message.contains("cooldown"));
+ }
+ CapturedEvent ev = capturedEvents.get(capturedEvents.size() - 1);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+ // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
+ // must be larger than cooldown period
+ assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
+ }
+
+ public static class TestSearchRateAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ try {
+ events.add(event);
+ long currentTimeNanos = cluster.getTimeSource().getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.getSource() + " was fired before the configured waitFor period");
+ }
+ getTriggerFiredLatch().countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+ }
+
+ @Test
+ public void testSearchRate() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String COLL1 = "collection1";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+ "conf", 1, 2);
+ create.process(solrClient);
+ waitForState(COLL1, 10, TimeUnit.SECONDS, clusterShape(1, 2));
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'search_rate_trigger'," +
+ "'event' : 'searchRate'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'rate' : 1.0," +
+ "'actions' : [" +
+ "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'search_rate_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+// SolrParams query = params(CommonParams.Q, "*:*");
+// for (int i = 0; i < 500; i++) {
+// solrClient.query(COLL1, query);
+// }
+
+ cluster.getSimClusterStateProvider().simSetCollectionValue(COLL1, "QUERY./select.requestTimes:1minRate", 500, true);
+
+ boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ cluster.getTimeSource().sleep(2000);
+ assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
+ CapturedEvent ev = listenerEvents.get("srt").get(0);
+ long now = cluster.getTimeSource().getTime();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get("node");
+ assertNotNull("nodeRates", nodeRates);
+ assertTrue(nodeRates.toString(), nodeRates.size() > 0);
+ AtomicDouble totalNodeRate = new AtomicDouble();
+ nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
+ List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get("replica");
+ assertNotNull("replicaRates", replicaRates);
+ assertTrue(replicaRates.toString(), replicaRates.size() > 0);
+ AtomicDouble totalReplicaRate = new AtomicDouble();
+ replicaRates.forEach(r -> {
+ assertTrue(r.toString(), r.getVariable("rate") != null);
+ totalReplicaRate.addAndGet((Double)r.getVariable("rate"));
+ });
+ Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get("shard");
+ assertNotNull("shardRates", shardRates);
+ assertEquals(shardRates.toString(), 1, shardRates.size());
+ shardRates = (Map<String, Object>)shardRates.get(COLL1);
+ assertNotNull("shardRates", shardRates);
+ assertEquals(shardRates.toString(), 1, shardRates.size());
+ AtomicDouble totalShardRate = new AtomicDouble();
+ shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r));
+ Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get("collection");
+ assertNotNull("collectionRates", collectionRates);
+ assertEquals(collectionRates.toString(), 1, collectionRates.size());
+ Double collectionRate = collectionRates.get(COLL1);
+ assertNotNull(collectionRate);
+ assertTrue(collectionRate > 5.0);
+ assertEquals(collectionRate, totalNodeRate.get(), 5.0);
+ assertEquals(collectionRate, totalShardRate.get(), 5.0);
+ assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java
new file mode 100644
index 0000000..0b412cb
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Simulated environment for autoscaling tests.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
index 42af083..c242809 100644
--- a/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/cdcr/BaseCdcrDistributedZkTest.java
@@ -59,6 +59,7 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
@@ -794,7 +795,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
protected void waitForBootstrapToComplete(String collectionName, String shardId) throws Exception {
NamedList rsp;// we need to wait until bootstrap is complete otherwise the replicator thread will never start
- TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
+ TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
rsp = invokeCdcrAction(shardToLeaderJetty.get(collectionName).get(shardId), CdcrParams.CdcrAction.BOOTSTRAP_STATUS);
if (rsp.get(RESPONSE_STATUS).toString().equals(COMPLETED)) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
index 329de79..500655d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/StressHdfsTest.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.BadHdfsThreadsFilter;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
@@ -221,7 +222,7 @@ public class StressHdfsTest extends BasicDistributedZkTest {
request.setPath("/admin/collections");
cloudClient.request(request);
- final TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS);
+ final TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (cloudClient.getZkStateReader().getClusterState().hasCollection(DELETE_DATA_DIR_COLLECTION)) {
if (timeout.hasTimedOut()) {
throw new AssertionError("Timeout waiting to see removed collection leave clusterstate");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 906e27b..0639479 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -33,6 +33,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
@@ -213,7 +214,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
writer.writePendingUpdates();
boolean found = false;
- TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS);
+ TimeOut timeOut = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
DocCollection c1 = reader.getClusterState().getCollection("c1");
if ("y".equals(c1.getStr("x"))) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
index 76c5c0f..626374c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/rule/RuleEngineTest.java
@@ -201,6 +201,9 @@ public class RuleEngineTest extends SolrTestCaseJ4{
public NodeStateProvider getNodeStateProvider() {
return new NodeStateProvider() {
@Override
+ public void close() throws IOException { }
+
+ @Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return (Map<String, Object>) MockSnitch.nodeVsTags.get(node);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java b/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
index 0bdf90c..f85b293 100644
--- a/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
+++ b/solr/core/src/test/org/apache/solr/core/OpenCloseCoreStressTest.java
@@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.Before;
@@ -318,8 +319,8 @@ class Indexer {
ArrayList<OneIndexer> _threads = new ArrayList<>();
public Indexer(OpenCloseCoreStressTest OCCST, String url, List<HttpSolrClient> clients, int numThreads, int secondsToRun, Random random) {
- stopTimeout = new TimeOut(secondsToRun, TimeUnit.SECONDS);
- nextTimeout = new TimeOut(60, TimeUnit.SECONDS);
+ stopTimeout = new TimeOut(secondsToRun, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ nextTimeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
docsThisCycle.set(0);
qTimesAccum.set(0);
updateCounts.set(0);
@@ -353,7 +354,7 @@ class Indexer {
log.info(String.format(Locale.ROOT, " s indexed: [run %,8d] [cycle %,8d] [last minute %,8d] Last core updated: %s. Seconds left in cycle %,4d",
myId, docsThisCycle.get(), myId - lastCount, core, stopTimeout.timeLeft(TimeUnit.SECONDS)));
lastCount = myId;
- nextTimeout = new TimeOut(60, TimeUnit.SECONDS);
+ nextTimeout = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
index e439d03..01f9199 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
@@ -520,6 +520,15 @@ public class AutoScalingConfig implements MapWriter {
return withTriggerListenerConfigs(configs);
}
+ @Override
+ public Object clone() {
+ if (jsonMap != null) {
+ return new AutoScalingConfig(jsonMap);
+ } else {
+ return new AutoScalingConfig(getPolicy(), getTriggerConfigs(), getTriggerListenerConfigs(), getProperties(), zkVersion);
+ }
+ }
+
/**
* Return the znode version that was used to create this configuration.
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
index 09b6193..17c48d5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingCloudManager.java
@@ -24,6 +24,7 @@ import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.util.ObjectCache;
+import org.apache.solr.common.util.TimeSource;
/**
* Base class for overriding some behavior of {@link SolrCloudManager}.
@@ -31,6 +32,7 @@ import org.apache.solr.common.util.ObjectCache;
public class DelegatingCloudManager implements SolrCloudManager {
private final SolrCloudManager delegate;
private ObjectCache objectCache = new ObjectCache();
+ private TimeSource timeSource = TimeSource.NANO_TIME;
public DelegatingCloudManager(SolrCloudManager delegate) {
this.delegate = delegate;
@@ -62,6 +64,16 @@ public class DelegatingCloudManager implements SolrCloudManager {
}
@Override
+ public boolean isClosed() {
+ return delegate.isClosed();
+ }
+
+ @Override
+ public TimeSource getTimeSource() {
+ return delegate == null ? timeSource : delegate.getTimeSource();
+ }
+
+ @Override
public SolrResponse request(SolrRequest req) throws IOException {
return delegate.request(req);
}
@@ -70,4 +82,9 @@ public class DelegatingCloudManager implements SolrCloudManager {
public byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException {
return delegate.httpRequest(url, method, headers, payload, timeout, followRedirects);
}
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
index b47d1c8..2fea23b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingDistribStateManager.java
@@ -45,6 +45,11 @@ public class DelegatingDistribStateManager implements DistribStateManager {
}
@Override
+ public List<String> listData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+ return delegate.listData(path, watcher);
+ }
+
+ @Override
public VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
return delegate.getData(path, watcher);
}
@@ -60,12 +65,17 @@ public class DelegatingDistribStateManager implements DistribStateManager {
}
@Override
+ public void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
+ delegate.makePath(path, data, createMode, failOnExists);
+ }
+
+ @Override
public String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException {
return delegate.createData(path, data, mode);
}
@Override
- public void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
+ public void removeData(String path, int version) throws NoSuchElementException, IOException, BadVersionException, KeeperException, InterruptedException {
delegate.removeData(path, version);
}
@@ -88,4 +98,9 @@ public class DelegatingDistribStateManager implements DistribStateManager {
public AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
return delegate.getAutoScalingConfig();
}
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
index 8b717f8..9ffde0f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DelegatingNodeStateProvider.java
@@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -40,4 +41,14 @@ public class DelegatingNodeStateProvider implements NodeStateProvider {
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return delegate.getReplicaInfo(node, keys);
}
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return delegate.isClosed();
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
index 4318418..26aaead 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/DistribStateManager.java
@@ -16,11 +16,11 @@
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
-import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
+import org.apache.solr.common.SolrCloseable;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
@@ -30,7 +30,7 @@ import org.apache.zookeeper.Watcher;
/**
* This interface represents a distributed state repository.
*/
-public interface DistribStateManager extends Closeable {
+public interface DistribStateManager extends SolrCloseable {
// state accessors
@@ -38,6 +38,8 @@ public interface DistribStateManager extends Closeable {
List<String> listData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
+ List<String> listData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
+
VersionedData getData(String path, Watcher watcher) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
default VersionedData getData(String path) throws NoSuchElementException, IOException, KeeperException, InterruptedException {
@@ -48,9 +50,19 @@ public interface DistribStateManager extends Closeable {
void makePath(String path) throws AlreadyExistsException, IOException, KeeperException, InterruptedException;
+ void makePath(String path, byte[] data, CreateMode createMode, boolean failOnExists) throws AlreadyExistsException, IOException, KeeperException, InterruptedException;
+
+ /**
+ * Create data (leaf) node at specified path.
+ * @param path base path name of the node.
+ * @param data data to be stored.
+ * @param mode creation mode.
+ * @return actual path of the node - in case of sequential nodes this will differ from the base path because
+ * of the appended sequence number.
+ */
String createData(String path, byte[] data, CreateMode mode) throws AlreadyExistsException, IOException, KeeperException, InterruptedException;
- void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException;
+ void removeData(String path, int version) throws NoSuchElementException, IOException, KeeperException, InterruptedException, BadVersionException;
void setData(String path, byte[] data, int version) throws BadVersionException, NoSuchElementException, IOException, KeeperException, InterruptedException;
@@ -61,9 +73,4 @@ public interface DistribStateManager extends Closeable {
default AutoScalingConfig getAutoScalingConfig() throws InterruptedException, IOException {
return getAutoScalingConfig(null);
}
-
- @Override
- default void close() throws IOException {
-
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
index dbf6836..68dfa39 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/NodeStateProvider.java
@@ -20,10 +20,12 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.solr.common.SolrCloseable;
+
/**
* This interface models the access to node and replica information.
*/
-public interface NodeStateProvider {
+public interface NodeStateProvider extends SolrCloseable {
/**
* Get the value of each tag for a given node
*
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index d73ae6c..f11121d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -37,9 +38,12 @@ import java.util.stream.Collectors;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@@ -56,6 +60,8 @@ import static java.util.stream.Collectors.toList;
*
*/
public class Policy implements MapWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
public static final String POLICY = "policy";
public static final String EACH = "#EACH";
public static final String ANY = "#ANY";
@@ -211,16 +217,26 @@ public class Policy implements MapWriter {
Set<String> collections = new HashSet<>();
List<Clause> expandedClauses;
List<Violation> violations = new ArrayList<>();
+ final int znodeVersion;
private Session(List<String> nodes, SolrCloudManager cloudManager,
- List<Row> matrix, List<Clause> expandedClauses) {
+ List<Row> matrix, List<Clause> expandedClauses, int znodeVersion) {
this.nodes = nodes;
this.cloudManager = cloudManager;
this.matrix = matrix;
this.expandedClauses = expandedClauses;
+ this.znodeVersion = znodeVersion;
}
Session(SolrCloudManager cloudManager) {
+ ClusterState state = null;
+ try {
+ state = cloudManager.getClusterStateProvider().getClusterState();
+ LOG.trace("-- session created with cluster state: {}", state);
+ } catch (Exception e) {
+ LOG.trace("-- session created, can't obtain cluster state", e);
+ }
+ this.znodeVersion = state != null ? state.getZNodeVersion() : -1;
this.nodes = new ArrayList<>(cloudManager.getClusterStateProvider().getLiveNodes());
this.cloudManager = cloudManager;
for (String node : nodes) {
@@ -256,7 +272,7 @@ public class Policy implements MapWriter {
}
Session copy() {
- return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses);
+ return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion);
}
List<Row> getMatrixCopy() {
@@ -297,6 +313,7 @@ public class Policy implements MapWriter {
@Override
public void writeMap(EntryWriter ew) throws IOException {
+ ew.put("znodeVersion", znodeVersion);
for (int i = 0; i < matrix.size(); i++) {
Row row = matrix.get(i);
ew.put(row.node, row);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index 024c6c3..40ca619 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -30,12 +30,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
-import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,7 +138,7 @@ public class PolicyHelper {
public static final int SESSION_EXPIRY = 180;//3 seconds
- public static MapWriter getDiagnostics(Policy policy, SolrClientCloudManager cloudManager) {
+ public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) {
Policy.Session session = policy.createSession(cloudManager);
List<Row> sorted = session.getSorted();
List<Violation> violations = session.getViolations();
@@ -233,9 +233,10 @@ public class PolicyHelper {
*
*/
private void returnSession(SessionWrapper sessionWrapper) {
+ TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
synchronized (lockObj) {
sessionWrapper.status = Status.EXECUTING;
- log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(MILLISECONDS),
+ log.info("returnSession, curr-time {} sessionWrapper.createTime {}, this.sessionWrapper.createTime {} ", time(timeSource, MILLISECONDS),
sessionWrapper.createTime,
this.sessionWrapper.createTime);
if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
@@ -255,13 +256,14 @@ public class PolicyHelper {
public SessionWrapper get(SolrCloudManager cloudManager) throws IOException, InterruptedException {
+ TimeSource timeSource = cloudManager.getTimeSource();
synchronized (lockObj) {
if (sessionWrapper.status == Status.NULL ||
- TimeUnit.SECONDS.convert(System.nanoTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
+ TimeUnit.SECONDS.convert(timeSource.getTime() - sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
//no session available or the session is expired
return createSession(cloudManager);
} else {
- long waitStart = time(MILLISECONDS);
+ long waitStart = time(timeSource, MILLISECONDS);
//the session is not expired
log.debug("reusing a session {}", this.sessionWrapper.createTime);
if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
@@ -269,13 +271,13 @@ public class PolicyHelper {
return sessionWrapper;
} else {
//status= COMPUTING it's being used for computing. computing is
- log.debug("session being used. waiting... current time {} ", time(MILLISECONDS));
+ log.debug("session being used. waiting... current time {} ", time(timeSource, MILLISECONDS));
try {
lockObj.wait(10 * 1000);//wait for a max of 10 seconds
} catch (InterruptedException e) {
log.info("interrupted... ");
}
- log.debug("out of waiting curr-time:{} time-elapsed {}", time(MILLISECONDS), timeElapsed(waitStart, MILLISECONDS));
+ log.debug("out of waiting curr-time:{} time-elapsed {}", time(timeSource, MILLISECONDS), timeElapsed(timeSource, waitStart, MILLISECONDS));
// now this thread has woken up because it got timed out after 10 seconds or it is notified after
//the session was returned from another COMPUTING operation
if (this.sessionWrapper.status == Status.UNUSED || this.sessionWrapper.status == Status.EXECUTING) {
@@ -289,8 +291,6 @@ public class PolicyHelper {
}
}
}
-
-
}
private SessionWrapper createSession(SolrCloudManager cloudManager) throws InterruptedException, IOException {
@@ -361,7 +361,9 @@ public class PolicyHelper {
}
public SessionWrapper(Policy.Session session, SessionRef ref) {
- lastUpdateTime = createTime = System.nanoTime();
+ lastUpdateTime = createTime = session != null ?
+ session.cloudManager.getTimeSource().getTime() :
+ TimeSource.NANO_TIME.getTime();
this.session = session;
this.status = Status.UNUSED;
this.ref = ref;
@@ -372,7 +374,9 @@ public class PolicyHelper {
}
public SessionWrapper update(Policy.Session session) {
- this.lastUpdateTime = System.nanoTime();
+ this.lastUpdateTime = session != null ?
+ session.cloudManager.getTimeSource().getTime() :
+ TimeSource.NANO_TIME.getTime();
this.session = session;
return this;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
index 5f7281f..930ede8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaInfo.java
@@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
@@ -33,21 +34,25 @@ public class ReplicaInfo implements MapWriter {
private String core, collection, shard;
private Replica.Type type;
private String node;
- private Map<String, Object> variables;
+ private final Map<String, Object> variables = new HashMap<>();
- public ReplicaInfo(String coll,String shard, Replica r, Map<String, Object> vals){
+ public ReplicaInfo(String coll, String shard, Replica r, Map<String, Object> vals) {
this.name = r.getName();
this.core = r.getCoreName();
this.collection = coll;
this.shard = shard;
this.type = r.getType();
- this.variables = vals;
+ if (vals != null) {
+ this.variables.putAll(vals);
+ }
this.node = r.getNodeName();
}
public ReplicaInfo(String name, String core, String coll, String shard, Replica.Type type, String node, Map<String, Object> vals) {
this.name = name;
- this.variables = vals;
+ if (vals != null) {
+ this.variables.putAll(vals);
+ }
this.collection = coll;
this.shard = shard;
this.type = type;
@@ -58,12 +63,22 @@ public class ReplicaInfo implements MapWriter {
@Override
public void writeMap(EntryWriter ew) throws IOException {
ew.put(name, (MapWriter) ew1 -> {
- if (variables != null) {
- for (Map.Entry<String, Object> e : variables.entrySet()) {
- ew1.put(e.getKey(), e.getValue());
- }
+ for (Map.Entry<String, Object> e : variables.entrySet()) {
+ ew1.put(e.getKey(), e.getValue());
+ }
+ if (core != null && !variables.containsKey(ZkStateReader.CORE_NAME_PROP)) {
+ ew1.put(ZkStateReader.CORE_NAME_PROP, core);
+ }
+ if (shard != null && !variables.containsKey(ZkStateReader.SHARD_ID_PROP)) {
+ ew1.put(ZkStateReader.SHARD_ID_PROP, shard);
+ }
+ if (collection != null && !variables.containsKey(ZkStateReader.COLLECTION_PROP)) {
+ ew1.put(ZkStateReader.COLLECTION_PROP, collection);
+ }
+ if (node != null && !variables.containsKey(ZkStateReader.NODE_NAME_PROP)) {
+ ew1.put(ZkStateReader.NODE_NAME_PROP, node);
}
- if (type != null) ew1.put("type", type.toString());
+ if (type != null) ew1.put(ZkStateReader.REPLICA_TYPE, type.toString());
});
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
index 8a1f8f0..55cdcee 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/SolrCloudManager.java
@@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.util.ObjectCache;
+import org.apache.solr.common.util.TimeSource;
/**
* This interface abstracts the access to a SolrCloud cluster, including interactions with Zookeeper, Solr
@@ -44,16 +45,11 @@ public interface SolrCloudManager extends SolrCloseable {
ObjectCache getObjectCache();
+ TimeSource getTimeSource();
+
// Solr-like methods
SolrResponse request(SolrRequest req) throws IOException;
byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) throws IOException;
-
- // distributed queue implementation
-
- @Override
- default void close() {
-
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
index aec5f15..070869a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggestion.java
@@ -38,7 +38,7 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.ANY;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
public class Suggestion {
- static final String coreidxsize = "INDEX.sizeInBytes";
+ public static final String coreidxsize = "INDEX.sizeInBytes";
static final Map<String, ConditionType> validatetypes = new HashMap<>();
public static ConditionType getTagType(String name) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 2432fb2..dfe15df 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -81,6 +81,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -1030,13 +1031,13 @@ public class CloudSolrClient extends SolrClient {
if (!liveNodes.isEmpty()) {
List<String> liveNodesList = new ArrayList<>(liveNodes);
Collections.shuffle(liveNodesList, rand);
- theUrlList.add(ZkStateReader.getBaseUrlForNodeName(liveNodesList.get(0),
+ theUrlList.add(Utils.getBaseUrlForNodeName(liveNodesList.get(0),
(String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
}
} else if (ADMIN_PATHS.contains(request.getPath())) {
for (String liveNode : liveNodes) {
- theUrlList.add(ZkStateReader.getBaseUrlForNodeName(liveNode,
+ theUrlList.add(Utils.getBaseUrlForNodeName(liveNode,
(String) stateProvider.getClusterProperty(ZkStateReader.URL_SCHEME,"http")));
}