You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/13 15:52:36 UTC
[1/2] lucene-solr:feature/autoscaling: SOLR-11000: Changes made via
AutoScalingHandler should be atomic.
Repository: lucene-solr
Updated Branches:
refs/heads/feature/autoscaling d2849cd45 -> 2590a430f
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 4b7c0d0..30dd85c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -40,6 +40,8 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
@@ -150,7 +152,7 @@ public class ScheduledTriggers implements Closeable {
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
if (scheduledSource == null) {
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, msg);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, msg);
log.warn(msg);
return false;
}
@@ -158,13 +160,13 @@ public class ScheduledTriggers implements Closeable {
AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.ABORTED, msg);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
log.warn(msg);
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
if (hasPendingActions.compareAndSet(false, true)) {
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.STARTED);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
final boolean enqueued;
if (replaying) {
enqueued = false;
@@ -182,21 +184,21 @@ public class ScheduledTriggers implements Closeable {
actionThrottle.markAttemptingAction();
ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
for (TriggerAction action : actions) {
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
try {
action.process(event, actionContext);
} catch (Exception e) {
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, action.getName(), actionContext, e, null);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, action.getName(), actionContext, e, null);
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
throw e;
}
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
}
if (enqueued) {
TriggerEvent ev = scheduledTrigger.dequeue();
assert ev.getId().equals(event.getId());
}
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
} finally {
hasPendingActions.set(false);
}
@@ -209,7 +211,7 @@ public class ScheduledTriggers implements Closeable {
+ " is broken! Expected event=" + event + " but got " + ev);
}
}
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
hasPendingActions.set(false);
}
return true;
@@ -371,7 +373,7 @@ public class ScheduledTriggers implements Closeable {
}
private class TriggerListeners {
- Map<String, Map<AutoScaling.EventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
+ Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
Map<String, TriggerListener> listenersPerName = new HashMap<>();
ReentrantLock updateLock = new ReentrantLock();
@@ -441,15 +443,15 @@ public class ScheduledTriggers implements Closeable {
continue;
}
// add per stage
- for (AutoScaling.EventProcessorStage stage : config.stages) {
+ for (TriggerEventProcessorStage stage : config.stages) {
addPerStage(config.trigger, stage, listener);
}
// add also for beforeAction / afterAction TriggerStage
if (!config.beforeActions.isEmpty()) {
- addPerStage(config.trigger, AutoScaling.EventProcessorStage.BEFORE_ACTION, listener);
+ addPerStage(config.trigger, TriggerEventProcessorStage.BEFORE_ACTION, listener);
}
if (!config.afterActions.isEmpty()) {
- addPerStage(config.trigger, AutoScaling.EventProcessorStage.AFTER_ACTION, listener);
+ addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
}
}
} finally {
@@ -457,8 +459,8 @@ public class ScheduledTriggers implements Closeable {
}
}
- private void addPerStage(String triggerName, AutoScaling.EventProcessorStage stage, TriggerListener listener) {
- Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage =
+ private void addPerStage(String triggerName, TriggerEventProcessorStage stage, TriggerListener listener) {
+ Map<TriggerEventProcessorStage, List<TriggerListener>> perStage =
listenersPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
lst.add(listener);
@@ -481,8 +483,8 @@ public class ScheduledTriggers implements Closeable {
reset();
}
- List<TriggerListener> getTriggerListeners(String trigger, AutoScaling.EventProcessorStage stage) {
- Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
+ List<TriggerListener> getTriggerListeners(String trigger, TriggerEventProcessorStage stage) {
+ Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
if (perStage == null) {
return Collections.emptyList();
}
@@ -494,31 +496,31 @@ public class ScheduledTriggers implements Closeable {
}
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage) {
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage) {
fireListeners(trigger, event, stage, null, null, null, null);
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String message) {
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String message) {
fireListeners(trigger, event, stage, null, null, null, message);
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context) {
fireListeners(trigger, event, stage, actionName, context, null, null);
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
updateLock.lock();
try {
for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
if (actionName != null) {
AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
- if (stage == AutoScaling.EventProcessorStage.BEFORE_ACTION) {
+ if (stage == TriggerEventProcessorStage.BEFORE_ACTION) {
if (!config.beforeActions.contains(actionName)) {
continue;
}
- } else if (stage == AutoScaling.EventProcessorStage.AFTER_ACTION) {
+ } else if (stage == TriggerEventProcessorStage.AFTER_ACTION) {
if (!config.afterActions.contains(actionName)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
index fa27759..12d4fef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.MapWriter;
import org.apache.solr.util.IdUtils;
@@ -33,15 +34,15 @@ public class TriggerEvent implements MapWriter {
protected final String id;
protected final String source;
protected final long eventTime;
- protected final AutoScaling.EventType eventType;
+ protected final TriggerEventType eventType;
protected final Map<String, Object> properties = new HashMap<>();
- public TriggerEvent(AutoScaling.EventType eventType, String source, long eventTime,
+ public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
Map<String, Object> properties) {
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties);
}
- public TriggerEvent(String id, AutoScaling.EventType eventType, String source, long eventTime,
+ public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
Map<String, Object> properties) {
this.id = id;
this.eventType = eventType;
@@ -92,7 +93,7 @@ public class TriggerEvent implements MapWriter {
/**
* Event type.
*/
- public AutoScaling.EventType getEventType() {
+ public TriggerEventType getEventType() {
return eventType;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
index 3a73f54..99f641c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -3,6 +3,7 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.Map;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -91,7 +92,7 @@ public class TriggerEventQueue extends DistributedQueue {
String id = (String)map.get("id");
String source = (String)map.get("source");
long eventTime = ((Number)map.get("eventTime")).longValue();
- AutoScaling.EventType eventType = AutoScaling.EventType.valueOf((String)map.get("eventType"));
+ TriggerEventType eventType = TriggerEventType.valueOf((String)map.get("eventType"));
Map<String, Object> properties = (Map<String, Object>)map.get("properties");
TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties);
res.getProperties().put(DEQUEUE_TIME, timeSource.getTime());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
index 479de49..3688bfc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
@@ -18,6 +18,8 @@ package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.core.CoreContainer;
/**
@@ -34,12 +36,12 @@ public interface TriggerListener extends Closeable {
* This method is called when either a particular <code>stage</code> or
* <code>actionName</code> is reached during event processing.
* @param event current event being processed
- * @param stage {@link AutoScaling.EventProcessorStage} that this listener was registered for, or null
+ * @param stage {@link TriggerEventProcessorStage} that this listener was registered for, or null
* @param actionName {@link TriggerAction} name that this listener was registered for, or null
* @param context optional {@link ActionContext} when the processing stage is related to an action, or null
* @param error optional {@link Throwable} error, or null
* @param message optional message
*/
- void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+ void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context,
Throwable error, String message) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
index 1cefa0e..01a4413 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.core.CoreContainer;
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 7f831df..e67912f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.common.cloud.ClusterState;
@@ -72,6 +73,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
private static ZkStateReader zkStateReaderMock;
private static ClusterState clusterStateMock;
private static SolrZkClient solrZkClientMock;
+ private static AutoScalingConfig autoScalingConfig = new AutoScalingConfig(Collections.emptyMap());
private final Map zkMap = new HashMap();
private final Map<String, ClusterState.CollectionRef> collectionsSet = new HashMap<>();
private final List<ZkNodeProps> replicas = new ArrayList<>();
@@ -197,6 +199,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
when(zkStateReaderMock.getZkClient()).thenReturn(solrZkClientMock);
when(zkStateReaderMock.getClusterState()).thenReturn(clusterStateMock);
+ when(zkStateReaderMock.getAutoScalingConfig()).thenReturn(autoScalingConfig);
when(clusterStateMock.getCollection(anyString())).thenAnswer(invocation -> {
String key = invocation.getArgument(0);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
index ef7e3e9..460cc80 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanActionTest.java
@@ -20,11 +20,11 @@ package org.apache.solr.cloud.autoscaling;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.CloudDescriptor;
@@ -126,7 +126,7 @@ public class AutoAddReplicasPlanActionTest extends SolrCloudTestCase{
private List<SolrRequest> getOperations(JettySolrRunner actionJetty, String lostNodeName) {
AutoAddReplicasPlanAction action = new AutoAddReplicasPlanAction();
- TriggerEvent lostNode = new NodeLostTrigger.NodeLostEvent(AutoScaling.EventType.NODELOST, ".auto_add_replicas", System.currentTimeMillis(), lostNodeName);
+ TriggerEvent lostNode = new NodeLostTrigger.NodeLostEvent(TriggerEventType.NODELOST, ".auto_add_replicas", System.currentTimeMillis(), lostNodeName);
ActionContext context = new ActionContext(actionJetty.getCoreContainer(), null, new HashMap<>());
action.process(lostNode, context);
List<SolrRequest> operations = (List) context.getProperty("operations");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
index 6494923..b953218 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
@@ -22,10 +22,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -41,6 +44,7 @@ import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.ContentStreamBase;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -182,7 +186,8 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
assertEquals(response.get("result").toString(), "success");
assertEquals(response.get("changed").toString(), "[node_lost_trigger]");
- byte[] data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
+ Stat stat = new Stat();
+ byte[] data = zkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
ZkNodeProps loaded = ZkNodeProps.load(data);
Map<String, Object> triggers = (Map<String, Object>) loaded.get("triggers");
assertNotNull(triggers);
@@ -370,7 +375,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
assertEquals(1, listeners.size());
assertTrue(listeners.containsKey("xyz"));
Map<String, Object> xyzListener = (Map<String, Object>) listeners.get("xyz");
- assertEquals(5, xyzListener.size());
+ assertEquals(6, xyzListener.size());
assertEquals("org.apache.solr.cloud.autoscaling.HttpTriggerListener", xyzListener.get("class").toString());
String removeTriggerCommand = "{" +
@@ -711,6 +716,53 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
}
}
+ @Test
+ public void testConcurrentUpdates() throws Exception {
+ int COUNT = 50;
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ CountDownLatch updateLatch = new CountDownLatch(COUNT * 2);
+ Runnable r = () -> {
+ for (int i = 0; i < COUNT; i++) {
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger1'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = null;
+ try {
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ } catch (Exception e) {
+ fail(e.toString());
+ } finally {
+ updateLatch.countDown();
+ }
+ }
+ };
+ Thread t1 = new Thread(r);
+ Thread t2 = new Thread(r);
+ t1.start();
+ t2.start();
+ boolean await = updateLatch.await(60, TimeUnit.SECONDS);
+ assertTrue("not all updates executed in time, remaining=" + updateLatch.getCount(), await);
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
+ NamedList<Object> response = solrClient.request(req);
+
+ Map triggers = (Map) response.get("triggers");
+ assertNotNull(triggers);
+ assertEquals(1, triggers.size());
+ assertTrue(triggers.containsKey("node_added_trigger1"));
+ Map node_added_trigger1 = (Map) triggers.get("node_added_trigger1");
+ assertEquals(4, node_added_trigger1.size());
+ assertEquals(0L, node_added_trigger1.get("waitFor"));
+ assertEquals(true, node_added_trigger1.get("enabled"));
+ assertEquals(3, ((List)node_added_trigger1.get("actions")).size());
+
+ }
+
public static SolrRequest createAutoScalingRequest(SolrRequest.METHOD m, String message) {
return createAutoScalingRequest(m, null, message);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java
index 3816fde..44f675d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ComputePlanActionTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Charsets;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -261,7 +262,7 @@ public class ComputePlanActionTest extends SolrCloudTestCase {
TriggerEvent triggerEvent = eventRef.get();
assertNotNull(triggerEvent);
- assertEquals(AutoScaling.EventType.NODELOST, triggerEvent.getEventType());
+ assertEquals(TriggerEventType.NODELOST, triggerEvent.getEventType());
assertEquals(stoppedNodeName, triggerEvent.getProperty(TriggerEvent.NODE_NAME));
Map context = actionContextPropsRef.get();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
index 6f088ea..6171fac 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -106,7 +107,7 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
action.init(Collections.singletonMap("name", "execute_plan"));
CollectionAdminRequest.MoveReplica moveReplica = new CollectionAdminRequest.MoveReplica(collectionName, replicas.get(0).getName(), survivor.getNodeName());
List<SolrRequest> operations = Collections.singletonList(moveReplica);
- NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent(AutoScaling.EventType.NODELOST,
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent(TriggerEventType.NODELOST,
"mock_trigger_name", TimeSource.CURRENT_TIME.getTime(), sourceNodeName);
ActionContext actionContext = new ActionContext(survivor.getCoreContainer(), null,
new HashMap<>(Collections.singletonMap("operations", operations)));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index a2beed4..29bfb59 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -43,7 +43,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
- private AutoScaling.EventProcessor noFirstRunProcessor = event -> {
+ private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
index 2492ce2..e501c64 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
@@ -43,7 +43,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
- private AutoScaling.EventProcessor noFirstRunProcessor = event -> {
+ private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 8ef935a..3671b83 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -925,17 +928,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(1, events.size());
TriggerEvent ev = events.iterator().next();
assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
- assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
+ assertEquals(TriggerEventType.NODELOST, ev.getEventType());
}
private static class TestEvent {
final AutoScalingConfig.TriggerListenerConfig config;
- final AutoScaling.EventProcessorStage stage;
+ final TriggerEventProcessorStage stage;
final String actionName;
final TriggerEvent event;
final String message;
- TestEvent(AutoScalingConfig.TriggerListenerConfig config, AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
+ TestEvent(AutoScalingConfig.TriggerListenerConfig config, TriggerEventProcessorStage stage, String actionName, TriggerEvent event, String message) {
this.config = config;
this.stage = stage;
this.actionName = actionName;
@@ -967,7 +970,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Override
- public synchronized void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
List<TestEvent> lst = listenerEvents.get(config.name);
if (lst == null) {
@@ -1058,34 +1061,34 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertNotNull("foo events: " + testEvents, testEvents);
assertEquals("foo events: " + testEvents, 5, testEvents.size());
- assertEquals(AutoScaling.EventProcessorStage.STARTED, testEvents.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.STARTED, testEvents.get(0).stage);
- assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
- assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
assertEquals("test", testEvents.get(2).actionName);
- assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(3).stage);
assertEquals("test1", testEvents.get(3).actionName);
- assertEquals(AutoScaling.EventProcessorStage.SUCCEEDED, testEvents.get(4).stage);
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, testEvents.get(4).stage);
// check bar events
testEvents = listenerEvents.get("bar");
assertNotNull("bar events", testEvents);
assertEquals("bar events", 4, testEvents.size());
- assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
assertEquals("test", testEvents.get(0).actionName);
- assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
- assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
assertEquals("test1", testEvents.get(2).actionName);
- assertEquals(AutoScaling.EventProcessorStage.SUCCEEDED, testEvents.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.SUCCEEDED, testEvents.get(3).stage);
// reset
triggerFired.set(false);
@@ -1104,15 +1107,15 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertNotNull("foo events: " + testEvents, testEvents);
assertEquals("foo events: " + testEvents, 4, testEvents.size());
- assertEquals(AutoScaling.EventProcessorStage.STARTED, testEvents.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.STARTED, testEvents.get(0).stage);
- assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
- assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
assertEquals("test", testEvents.get(2).actionName);
- assertEquals(AutoScaling.EventProcessorStage.FAILED, testEvents.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.FAILED, testEvents.get(3).stage);
assertEquals("test1", testEvents.get(3).actionName);
// check bar events
@@ -1120,16 +1123,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertNotNull("bar events", testEvents);
assertEquals("bar events", 4, testEvents.size());
- assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
assertEquals("test", testEvents.get(0).actionName);
- assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+ assertEquals(TriggerEventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
assertEquals("test", testEvents.get(1).actionName);
- assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+ assertEquals(TriggerEventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
assertEquals("test1", testEvents.get(2).actionName);
- assertEquals(AutoScaling.EventProcessorStage.FAILED, testEvents.get(3).stage);
+ assertEquals(TriggerEventProcessorStage.FAILED, testEvents.get(3).stage);
assertEquals("test1", testEvents.get(3).actionName);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
new file mode 100644
index 0000000..06cfac3
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/AutoScalingConfig.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Bean representation of <code>autoscaling.json</code>, which parses data
+ * lazily.
+ */
+public class AutoScalingConfig implements MapWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Map<String, Object> jsonMap;
+ private final boolean empty;
+
+ private Policy policy;
+ private Map<String, TriggerConfig> triggers;
+ private Map<String, TriggerListenerConfig> listeners;
+
+ private final int zkVersion;
+
+ /**
+ * Bean representation of trigger listener config.
+ */
+ public static class TriggerListenerConfig implements MapWriter {
+ public final String name;
+ public final String trigger;
+ public final EnumSet<TriggerEventProcessorStage> stages = EnumSet.noneOf(TriggerEventProcessorStage.class);
+ public final String listenerClass;
+ public final Set<String> beforeActions;
+ public final Set<String> afterActions;
+ public final Map<String, Object> properties;
+
+ public TriggerListenerConfig(String name, Map<String, Object> properties) {
+ this.name = name;
+ if (properties == null) {
+ this.properties = Collections.emptyMap();
+ } else {
+ this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
+ }
+ trigger = (String)this.properties.get(AutoScalingParams.TRIGGER);
+ List<String> stageNames = getList(AutoScalingParams.STAGE, this.properties);
+ for (String stageName : stageNames) {
+ try {
+ TriggerEventProcessorStage stage = TriggerEventProcessorStage.valueOf(stageName.toUpperCase(Locale.ROOT));
+ stages.add(stage);
+ } catch (Exception e) {
+ LOG.warn("Invalid stage name '" + name + "' in listener config, skipping: " + properties);
+ }
+ }
+ listenerClass = (String)this.properties.get(AutoScalingParams.CLASS);
+ beforeActions = Collections.unmodifiableSet(new HashSet<>(getList(AutoScalingParams.BEFORE_ACTION, this.properties)));
+ afterActions = Collections.unmodifiableSet(new HashSet<>(getList(AutoScalingParams.AFTER_ACTION, this.properties)));
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ // don't write duplicate entries - skip explicit fields if their values
+ // are already contained in properties
+// if (!properties.containsKey(AutoScalingParams.NAME)) {
+// ew.put(AutoScalingParams.NAME, name);
+// }
+ if (!properties.containsKey(AutoScalingParams.CLASS)) {
+ ew.put(AutoScalingParams.CLASS, listenerClass);
+ }
+ if (!properties.containsKey(AutoScalingParams.TRIGGER)) {
+ ew.put(AutoScalingParams.TRIGGER, trigger);
+ }
+ if (!properties.containsKey(AutoScalingParams.STAGE)) {
+ ew.put(AutoScalingParams.STAGE, stages);
+ }
+ if (!properties.containsKey(AutoScalingParams.BEFORE_ACTION)) {
+ ew.put(AutoScalingParams.BEFORE_ACTION, beforeActions);
+ }
+ if (!properties.containsKey(AutoScalingParams.AFTER_ACTION)) {
+ ew.put(AutoScalingParams.AFTER_ACTION, afterActions);
+ }
+ // forEach doesn't allow throwing exceptions...
+ for (Map.Entry<String, Object> entry : properties.entrySet()) {
+ ew.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TriggerListenerConfig that = (TriggerListenerConfig) o;
+
+ if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
+ if (!stages.equals(that.stages)) return false;
+ if (listenerClass != null ? !listenerClass.equals(that.listenerClass) : that.listenerClass != null) return false;
+ if (!beforeActions.equals(that.beforeActions)) return false;
+ if (!afterActions.equals(that.afterActions)) return false;
+ return properties.equals(that.properties);
+ }
+ }
+
+ /**
+ * Bean representation of trigger config.
+ */
+ public static class TriggerConfig implements MapWriter {
+ /** Trigger name. */
+ public final String name;
+ /** Trigger event type. */
+ public final TriggerEventType event;
+ /** Enabled flag. */
+ public final boolean enabled;
+ /** List of configured actions, never null. */
+ public final List<ActionConfig> actions;
+ /** Map of additional trigger properties, never null. */
+ public final Map<String, Object> properties;
+
+ public TriggerConfig(String name, Map<String, Object> properties) {
+ this.name = name;
+ if (properties != null) {
+ this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
+ } else {
+ this.properties = Collections.emptyMap();
+ }
+ String event = (String) this.properties.get(AutoScalingParams.EVENT);
+ if (event != null) {
+ TriggerEventType type = null;
+ try {
+ type = TriggerEventType.valueOf(event.toUpperCase(Locale.ROOT));
+ } catch (Exception e) {
+ }
+ if (type == null) {
+ this.event = TriggerEventType.INVALID;
+ } else {
+ this.event = type;
+ }
+ } else {
+ this.event = TriggerEventType.INVALID;
+ }
+ enabled = Boolean.parseBoolean(String.valueOf(this.properties.getOrDefault("enabled", "true")));
+
+ List<Map<String, Object>> newActions = (List<Map<String, Object>>)this.properties.get("actions");
+ if (newActions != null) {
+ this.actions = newActions.stream().map(ActionConfig::new).collect(collectingAndThen(toList(), Collections::unmodifiableList));
+ } else {
+ this.actions = Collections.emptyList();
+ }
+ }
+
+ /**
+ * Create a copy of this config with specified enabled flag.
+ * @param enabled true when enabled, false otherwise.
+ * @return modified copy of the configuration
+ */
+ public TriggerConfig withEnabled(boolean enabled) {
+ Map<String, Object> props = new HashMap<>(properties);
+ props.put(AutoScalingParams.ENABLED, String.valueOf(enabled));
+ return new TriggerConfig(name, props);
+ }
+
+ /**
+ * Create a copy of this config with specified property.
+ * @param key property name
+ * @param value property value
+ * @return modified copy of the configuration
+ */
+ public TriggerConfig withProperty(String key, Object value) {
+ Map<String, Object> props = new HashMap<>(properties);
+ props.put(key, String.valueOf(value));
+ return new TriggerConfig(name, props);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TriggerConfig that = (TriggerConfig) o;
+
+ if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (event != that.event) return false;
+ return properties.equals(that.properties);
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+// if (!properties.containsKey(AutoScalingParams.NAME)) {
+// ew.put(AutoScalingParams.NAME, name);
+// }
+ if (!properties.containsKey(AutoScalingParams.EVENT)) {
+ ew.put(AutoScalingParams.EVENT, event.toString());
+ }
+ // forEach doesn't allow throwing exceptions...
+ for (Map.Entry<String, Object> entry : properties.entrySet()) {
+ ew.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Bean representation of trigger action configuration.
+ */
+ public static class ActionConfig implements MapWriter {
+ /** Action name. */
+ public final String name;
+ /** Class name of action implementtion. */
+ public final String actionClass;
+ /** Additional action properties. */
+ public final Map<String, Object> properties;
+
+ /**
+ * Construct from a JSON map.
+ * @param properties JSON map with properties - selected properties will be
+ * used for setting the values of <code>name</code> and
+ * <code>actionClass</code>.
+ */
+ public ActionConfig(Map<String, Object> properties) {
+ if (properties != null) {
+ this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
+ } else {
+ this.properties = Collections.emptyMap();
+ }
+ this.name = (String)this.properties.get(AutoScalingParams.NAME);
+ this.actionClass = (String)this.properties.get(AutoScalingParams.CLASS);
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ // forEach doesn't allow throwing exceptions...
+ for (Map.Entry<String, Object> entry : properties.entrySet()) {
+ ew.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ActionConfig that = (ActionConfig) o;
+
+ return properties.equals(that.properties);
+ }
+ }
+
+ /**
+ * Construct from bytes that represent a UTF-8 JSON string.
+ * @param utf8 config data
+ */
+ public AutoScalingConfig(byte[] utf8) {
+ this(utf8 != null && utf8.length > 0 ? (Map<String, Object>)Utils.fromJSON(utf8) : Collections.emptyMap());
+ }
+
+ /**
+ * Construct from a JSON map representation.
+ * @param jsonMap JSON map representation of the config. Note that this map is evaluated lazily, and
+ * outside modifications may cause unpredictable behavior.
+ */
+ public AutoScalingConfig(Map<String, Object> jsonMap) {
+ this.jsonMap = jsonMap;
+ int version = -1;
+ if (jsonMap.containsKey(AutoScalingParams.ZK_VERSION)) {
+ try {
+ version = (Integer)jsonMap.get(AutoScalingParams.ZK_VERSION);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ zkVersion = version;
+ jsonMap.remove(AutoScalingParams.ZK_VERSION);
+ empty = jsonMap.isEmpty();
+ }
+
+ private AutoScalingConfig(Policy policy, Map<String, TriggerConfig> triggerConfigs, Map<String,
+ TriggerListenerConfig> listenerConfigs, int zkVersion) {
+ this.policy = policy;
+ this.triggers = triggerConfigs != null ? Collections.unmodifiableMap(triggerConfigs) : null;
+ this.listeners = listenerConfigs != null ? Collections.unmodifiableMap(listenerConfigs) : null;
+ this.jsonMap = null;
+ this.zkVersion = zkVersion;
+ this.empty = policy == null &&
+ (triggerConfigs == null || triggerConfigs.isEmpty()) &&
+ (listenerConfigs == null || listenerConfigs.isEmpty());
+ }
+
+ /**
+ * Return true if the source <code>autoscaling.json</code> was empty, false otherwise.
+ */
+ public boolean isEmpty() {
+ return empty;
+ }
+
+ /**
+ * Get {@link Policy} configuration.
+ */
+ public Policy getPolicy() {
+ if (policy == null) {
+ if (jsonMap != null) {
+ policy = new Policy(jsonMap);
+ } else {
+ policy = new Policy();
+ }
+ }
+ return policy;
+ }
+
+ /**
+ * Get trigger configurations.
+ */
+ public Map<String, TriggerConfig> getTriggerConfigs() {
+ if (triggers == null) {
+ if (jsonMap != null) {
+ Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
+ if (trigMap == null) {
+ triggers = Collections.emptyMap();
+ } else {
+ HashMap<String, TriggerConfig> newTriggers = new HashMap<>(trigMap.size());
+ for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
+ newTriggers.put(entry.getKey(), new TriggerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
+ }
+ triggers = Collections.unmodifiableMap(newTriggers);
+ }
+ } else {
+ triggers = Collections.emptyMap();
+ }
+ }
+ return triggers;
+ }
+
+ /**
+ * Check whether triggers for specific event type exist.
+ * @param types list of event types
+ * @return true if there's at least one trigger matching at least one event type,
+ * false otherwise,
+ */
+ public boolean hasTriggerForEvents(TriggerEventType... types) {
+ if (types == null || types.length == 0) {
+ return false;
+ }
+ for (TriggerConfig config : getTriggerConfigs().values()) {
+ for (TriggerEventType type : types) {
+ if (config.event.equals(type)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get listener configurations.
+ */
+ public Map<String, TriggerListenerConfig> getTriggerListenerConfigs() {
+ if (listeners == null) {
+ if (jsonMap != null) {
+ Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
+ if (map == null) {
+ listeners = Collections.emptyMap();
+ } else {
+ HashMap<String, TriggerListenerConfig> newListeners = new HashMap<>(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ newListeners.put(entry.getKey(), new TriggerListenerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
+ }
+ this.listeners = Collections.unmodifiableMap(newListeners);
+ }
+ } else {
+ listeners = Collections.emptyMap();
+ }
+ }
+ return listeners;
+ }
+
+ /**
+ * Create a copy of the config with replaced policy.
+ * @param policy new policy
+ * @return modified copy of the configuration
+ */
+ public AutoScalingConfig withPolicy(Policy policy) {
+ return new AutoScalingConfig(policy, getTriggerConfigs(), getTriggerListenerConfigs(), zkVersion);
+ }
+
+ /**
+ * Create a copy of the config with replaced trigger configurations.
+ * @param configs new trigger configurations
+ * @return modified copy of the configuration
+ */
+ public AutoScalingConfig withTriggerConfigs(Map<String, TriggerConfig> configs) {
+ return new AutoScalingConfig(getPolicy(), configs, getTriggerListenerConfigs(), zkVersion);
+ }
+
+ /**
+ * Create a copy of the config with replaced trigger configuration
+ * @param config new trigger configuration
+ * @return modified copy of the configuration
+ */
+ public AutoScalingConfig withTriggerConfig(TriggerConfig config) {
+ Map<String, TriggerConfig> configs = new HashMap<>(getTriggerConfigs());
+ configs.put(config.name, config);
+ return withTriggerConfigs(configs);
+ }
+
+ /**
+ * Create a copy of the config without a trigger configuration.
+ * @param name trigger configuration name
+ * @return modified copy of the configuration, even if the specified config name didn't exist.
+ */
+ public AutoScalingConfig withoutTriggerConfig(String name) {
+ Map<String, TriggerConfig> configs = new HashMap<>(getTriggerConfigs());
+ configs.remove(name);
+ return withTriggerConfigs(configs);
+ }
+
+ /**
+ * Create a copy of the config with replaced trigger listener configurations.
+ * @param configs new trigger listener configurations
+ * @return modified copy of the configuration
+ */
+ public AutoScalingConfig withTriggerListenerConfigs(Map<String, TriggerListenerConfig> configs) {
+ return new AutoScalingConfig(getPolicy(), getTriggerConfigs(), configs, zkVersion);
+ }
+
+ /**
+ * Create a copy of the config with replaced trigger listener configuration.
+ * @param config new trigger listener configuration
+ * @return modified copy of the configuration
+ */
+ public AutoScalingConfig withTriggerListenerConfig(TriggerListenerConfig config) {
+ Map<String, TriggerListenerConfig> configs = new HashMap<>(getTriggerListenerConfigs());
+ configs.put(config.name, config);
+ return withTriggerListenerConfigs(configs);
+ }
+
+ /**
+ * Create a copy of the config without a trigger listener configuration.
+ * @param name trigger listener configuration name
+ * @return modified copy of the configuration, even if the specified config name didn't exist.
+ */
+ public AutoScalingConfig withoutTriggerListenerConfig(String name) {
+ Map<String, TriggerListenerConfig> configs = new HashMap<>(getTriggerListenerConfigs());
+ configs.remove(name);
+ return withTriggerListenerConfigs(configs);
+ }
+
+ /**
+ * Return the znode version that was used to create this configuration.
+ */
+ public int getZkVersion() {
+ return zkVersion;
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ Policy policy = getPolicy();
+ // properties of Policy are expected at top level
+ policy.writeMap(ew);
+
+ ew.put("triggers", getTriggerConfigs());
+ ew.put("listeners", getTriggerListenerConfigs());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AutoScalingConfig that = (AutoScalingConfig) o;
+
+ if (!getPolicy().equals(that.getPolicy())) return false;
+ if (!triggers.equals(that.triggers)) return false;
+ return listeners.equals(that.listeners);
+ }
+
+ private static List<String> getList(String key, Map<String, Object> properties) {
+ return getList(key, properties, null);
+ }
+
+ private static List<String> getList(String key, Map<String, Object> properties, List<String> defaultList) {
+ if (defaultList == null) {
+ defaultList = Collections.emptyList();
+ }
+ Object o = properties.get(key);
+ if (o == null) {
+ return defaultList;
+ }
+ if (o instanceof List) {
+ return (List)o;
+ } else {
+ return Collections.singletonList(String.valueOf(o));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
index 7c82e6c..ab9b4be 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
@@ -20,6 +20,7 @@ package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -51,14 +52,14 @@ import static org.apache.solr.common.params.CoreAdminParams.SHARD;
// a set of conditions in a policy
public class Clause implements MapWriter, Comparable<Clause> {
- public Map<String, Object> original;
- public Condition collection, shard, replica, tag, globalTag;
- public final Replica.Type type;
+ final Map<String, Object> original;
+ Condition collection, shard, replica, tag, globalTag;
+ final Replica.Type type;
boolean strict = true;
public Clause(Map<String, Object> m) {
- this.original = m;
+ this.original = Utils.getDeepCopy(m, 10);
String type = (String) m.get("type");
this.type = type == null || ANY.equals(type) ? null : Replica.Type.valueOf(type.toUpperCase(Locale.ROOT));
strict = Boolean.parseBoolean(String.valueOf(m.getOrDefault("strict", "true")));
@@ -74,7 +75,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
} else {
collection = parse(COLLECTION, m);
shard = parse(SHARD, m);
- if(m.get(REPLICA) == null){
+ if (m.get(REPLICA) == null) {
throw new RuntimeException(StrUtils.formatString("'replica' is required in {0}", Utils.toJSONString(m)));
}
this.replica = parse(REPLICA, m);
@@ -126,10 +127,17 @@ public class Clause implements MapWriter, Comparable<Clause> {
} else {
return 0;
}
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Clause that = (Clause)o;
+ return compareTo(that) == 0;
}
- void addTags(List<String> params) {
+ void addTags(Collection<String> params) {
if (globalTag != null && !params.contains(globalTag.name)) params.add(globalTag.name);
if (tag != null && !params.contains(tag.name)) params.add(tag.name);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index d013e46..3ccf0c1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -46,6 +46,7 @@ import org.apache.solr.common.util.Utils;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
+import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
/*The class that reads, parses and applies policies specified in
@@ -61,55 +62,78 @@ public class Policy implements MapWriter {
public static final String POLICY = "policy";
public static final String EACH = "#EACH";
public static final String ANY = "#ANY";
+ public static final String POLICIES = "policies";
public static final String CLUSTER_POLICY = "cluster-policy";
- public static final String CLUSTER_PREFERENCE = "cluster-preferences";
+ public static final String CLUSTER_PREFERENCES = "cluster-preferences";
public static final Set<String> GLOBAL_ONLY_TAGS = Collections.singleton("cores");
- final Map<String, List<Clause>> policies = new HashMap<>();
+ public static final Preference DEFAULT_PREFERENCE = new Preference((Map<String, Object>) Utils.fromJSONString("{minimize : cores, precision:1}"));
+ final Map<String, List<Clause>> policies;
final List<Clause> clusterPolicy;
final List<Preference> clusterPreferences;
final List<String> params;
+ public Policy() {
+ this(Collections.emptyMap());
+ }
public Policy(Map<String, Object> jsonMap) {
- clusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCE, emptyList())).stream()
+ List<Preference> initialClusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCES, emptyList())).stream()
.map(Preference::new)
.collect(toList());
- for (int i = 0; i < clusterPreferences.size() - 1; i++) {
- Preference preference = clusterPreferences.get(i);
- preference.next = clusterPreferences.get(i + 1);
+ for (int i = 0; i < initialClusterPreferences.size() - 1; i++) {
+ Preference preference = initialClusterPreferences.get(i);
+ preference.next = initialClusterPreferences.get(i + 1);
}
- if (clusterPreferences.isEmpty()) {
- clusterPreferences.add(new Preference((Map<String, Object>) Utils.fromJSONString("{minimize : cores, precision:1}")));
+ if (initialClusterPreferences.isEmpty()) {
+ initialClusterPreferences.add(DEFAULT_PREFERENCE);
}
- SortedSet<String> paramsOfInterest = new TreeSet<>();
+ this.clusterPreferences = Collections.unmodifiableList(initialClusterPreferences);
+ final SortedSet<String> paramsOfInterest = new TreeSet<>();
for (Preference preference : clusterPreferences) {
if (paramsOfInterest.contains(preference.name.name())) {
throw new RuntimeException(preference.name + " is repeated");
}
paramsOfInterest.add(preference.name.toString());
}
- this.params = new ArrayList<>(paramsOfInterest);
-
+ List<String> newParams = new ArrayList<>(paramsOfInterest);
clusterPolicy = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_POLICY, emptyList())).stream()
.map(Clause::new)
.filter(clause -> {
- clause.addTags(params);
+ clause.addTags(newParams);
return true;
})
- .collect(Collectors.toList());
+ .collect(collectingAndThen(toList(), Collections::unmodifiableList));
- ((Map<String, List<Map<String, Object>>>) jsonMap.getOrDefault("policies", emptyMap())).forEach((s, l1) ->
- this.policies.put(s, l1.stream()
- .map(Clause::new)
- .filter(clause -> {
- if (!clause.isPerCollectiontag())
- throw new RuntimeException(clause.globalTag.name + " is only allowed in 'cluster-policy'");
- clause.addTags(params);
- return true;
- })
- .sorted()
- .collect(toList())));
+ this.policies = Collections.unmodifiableMap(
+ policiesFromMap((Map<String, List<Map<String, Object>>>)jsonMap.getOrDefault(POLICIES, emptyMap()), newParams));
+ this.params = Collections.unmodifiableList(newParams);
+
+ }
+
+ private Policy(Map<String, List<Clause>> policies, List<Clause> clusterPolicy, List<Preference> clusterPreferences,
+ List<String> params) {
+ this.policies = policies != null ? Collections.unmodifiableMap(policies) : Collections.emptyMap();
+ this.clusterPolicy = clusterPolicy != null ? Collections.unmodifiableList(clusterPolicy) : Collections.emptyList();
+ this.clusterPreferences = clusterPreferences != null ? Collections.unmodifiableList(clusterPreferences) :
+ Collections.singletonList(DEFAULT_PREFERENCE);
+ this.params = params != null ? Collections.unmodifiableList(params) : Collections.emptyList();
+ }
+
+ public Policy withPolicies(Map<String, List<Clause>> policies) {
+ return new Policy(policies, clusterPolicy, clusterPreferences, params);
+ }
+
+ public Policy withClusterPreferences(List<Preference> clusterPreferences) {
+ return new Policy(policies, clusterPolicy, clusterPreferences, params);
+ }
+
+ public Policy withClusterPolicy(List<Clause> clusterPolicy) {
+ return new Policy(policies, clusterPolicy, clusterPreferences, params);
+ }
+
+ public Policy withParams(List<String> params) {
+ return new Policy(policies, clusterPolicy, clusterPreferences, params);
}
public List<Clause> getClusterPolicy() {
@@ -123,24 +147,43 @@ public class Policy implements MapWriter {
@Override
public void writeMap(EntryWriter ew) throws IOException {
if (!policies.isEmpty()) {
- ew.put("policies", (MapWriter) ew1 -> {
+ ew.put(POLICIES, (MapWriter) ew1 -> {
for (Map.Entry<String, List<Clause>> e : policies.entrySet()) {
ew1.put(e.getKey(), e.getValue());
}
});
}
if (!clusterPreferences.isEmpty()) {
- ew.put("preferences", (IteratorWriter) iw -> {
+ ew.put(CLUSTER_PREFERENCES, (IteratorWriter) iw -> {
for (Preference p : clusterPreferences) iw.add(p);
});
}
+ if (!clusterPolicy.isEmpty()) {
+ ew.put(CLUSTER_POLICY, (IteratorWriter) iw -> {
+ for (Clause c : clusterPolicy) {
+ iw.add(c);
+ }
+ });
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Policy policy = (Policy) o;
+
+ if (!getPolicies().equals(policy.getPolicies())) return false;
+ if (!getClusterPolicy().equals(policy.getClusterPolicy())) return false;
+ if (!getClusterPreferences().equals(policy.getClusterPreferences())) return false;
+ return params.equals(policy.params);
}
/*This stores the logical state of the system, given a policy and
- * a cluster state.
- *
- */
+ * a cluster state.
+ *
+ */
public class Session implements MapWriter {
final List<String> nodes;
final ClusterDataProvider dataProvider;
@@ -461,6 +504,22 @@ public class Policy implements MapWriter {
}
+ public static Map<String, List<Clause>> policiesFromMap(Map<String, List<Map<String, Object>>> map, List<String> newParams) {
+ Map<String, List<Clause>> newPolicies = new HashMap<>();
+ map.forEach((s, l1) ->
+ newPolicies.put(s, l1.stream()
+ .map(Clause::new)
+ .filter(clause -> {
+ if (!clause.isPerCollectiontag())
+ throw new RuntimeException(clause.globalTag.name + " is only allowed in 'cluster-policy'");
+ clause.addTags(newParams);
+ return true;
+ })
+ .sorted()
+ .collect(collectingAndThen(toList(), Collections::unmodifiableList))));
+ return newPolicies;
+ }
+
public static List<Clause> mergePolicies(String coll,
List<Clause> collPolicy,
List<Clause> globalPolicy) {
@@ -499,4 +558,8 @@ public class Policy implements MapWriter {
public Map<String, List<Clause>> getPolicies() {
return policies;
}
+
+ public List<String> getParams() {
+ return params;
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index da65601..1a6f33e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -37,7 +37,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.AD
import static org.apache.solr.common.params.CoreAdminParams.NODE;
public class PolicyHelper {
- public static List<ReplicaPosition> getReplicaLocations(String collName, Map<String, Object> autoScalingJson,
+ public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
ClusterDataProvider cdp,
Map<String, String> optionalPolicyMapping,
List<String> shardNames,
@@ -73,8 +73,7 @@ public class PolicyHelper {
};
}
- Policy policy = new Policy(autoScalingJson);
- Policy.Session session = policy.createSession(cdp);
+ Policy.Session session = autoScalingConfig.getPolicy().createSession(cdp);
Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
typeVsCount.put(Replica.Type.NRT, nrtReplicas);
typeVsCount.put(Replica.Type.TLOG, tlogReplicas);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java
index 43fec6b..29245a2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Preference.java
@@ -30,10 +30,10 @@ public class Preference implements MapWriter {
Integer precision;
final Policy.Sort sort;
Preference next;
- public int idx;
+ private int idx;
private final Map original;
- Preference(Map<String, Object> m) {
+ public Preference(Map<String, Object> m) {
this.original = Utils.getDeepCopy(m,3);
sort = Policy.Sort.get(m);
name = Policy.SortParam.get(m.get(sort.name()).toString());
@@ -86,6 +86,21 @@ public class Preference implements MapWriter {
}
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Preference that = (Preference) o;
+
+ if (idx != that.idx) return false;
+ if (getName() != that.getName()) return false;
+ if (precision != null ? !precision.equals(that.precision) : that.precision != null) return false;
+ if (sort != that.sort) return false;
+ if (next != null ? !next.equals(that.next) : that.next != null) return false;
+ return original.equals(that.original);
+ }
+
public Policy.SortParam getName() {
return name;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventProcessorStage.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventProcessorStage.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventProcessorStage.java
new file mode 100644
index 0000000..ae9c771
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventProcessorStage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+/**
+ * Enum that represents the stages of trigger event processing.
+ */
+public enum TriggerEventProcessorStage {
+ WAITING,
+ STARTED,
+ ABORTED,
+ SUCCEEDED,
+ FAILED,
+ BEFORE_ACTION,
+ AFTER_ACTION
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java
new file mode 100644
index 0000000..238d7e1
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/TriggerEventType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud.autoscaling;
+
+/**
+ * Enum that represents trigger event types.
+ */
+public enum TriggerEventType {
+ NODEADDED,
+ NODELOST,
+ REPLICALOST,
+ MANUAL,
+ SCHEDULED,
+ SEARCHRATE,
+ INDEXRATE,
+ INVALID
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/common/MapWriter.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/MapWriter.java b/solr/solrj/src/java/org/apache/solr/common/MapWriter.java
index fca6e2b..080d750 100644
--- a/solr/solrj/src/java/org/apache/solr/common/MapWriter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/MapWriter.java
@@ -20,7 +20,9 @@ package org.apache.solr.common;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -38,6 +40,25 @@ public interface MapWriter extends MapSerializable {
public EntryWriter put(String k, Object v) throws IOException {
if (v instanceof MapWriter) v = ((MapWriter) v).toMap(new LinkedHashMap<>());
if (v instanceof IteratorWriter) v = ((IteratorWriter) v).toList(new ArrayList<>());
+ if (v instanceof Iterable) {
+ List lst = new ArrayList();
+ for (Object vv : (Iterable)v) {
+ if (vv instanceof MapWriter) vv = ((MapWriter) vv).toMap(new LinkedHashMap<>());
+ if (vv instanceof IteratorWriter) vv = ((IteratorWriter) vv).toList(new ArrayList<>());
+ lst.add(vv);
+ }
+ v = lst;
+ }
+ if (v instanceof Map) {
+ Map map = new LinkedHashMap();
+ for (Map.Entry<?, ?> entry : ((Map<?, ?>)v).entrySet()) {
+ Object vv = entry.getValue();
+ if (vv instanceof MapWriter) vv = ((MapWriter) vv).toMap(new LinkedHashMap<>());
+ if (vv instanceof IteratorWriter) vv = ((IteratorWriter) vv).toList(new ArrayList<>());
+ map.put(entry.getKey(), vv);
+ }
+ v = map;
+ }
map.put(k, v);
return this;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index ff60a59..23d624b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -43,9 +44,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.common.Callable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.Pair;
@@ -158,6 +161,37 @@ public class ZkStateReader implements Closeable {
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
+ /**
+ * Get current {@link AutoScalingConfig}.
+ * @return current configuration from <code>autoscaling.json</code>. NOTE:
+ * this data is retrieved from ZK on each call.
+ */
+ public AutoScalingConfig getAutoScalingConfig() throws KeeperException, InterruptedException {
+ return getAutoScalingConfig(null);
+ }
+
+ /**
+ * Get current {@link AutoScalingConfig}.
+ * @param watcher optional {@link Watcher} to set on a znode to watch for config changes.
+ * @return current configuration from <code>autoscaling.json</code>. NOTE:
+ * this data is retrieved from ZK on each call.
+ */
+ public AutoScalingConfig getAutoScalingConfig(Watcher watcher) throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+
+ Map<String, Object> map = new HashMap<>();
+ try {
+ byte[] bytes = zkClient.getData(SOLR_AUTOSCALING_CONF_PATH, watcher, stat, true);
+ if (bytes != null && bytes.length > 0) {
+ map = (Map<String, Object>) fromJSON(bytes);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ }
+ map.put(AutoScalingParams.ZK_VERSION, stat.getVersion());
+ return new AutoScalingConfig(map);
+ }
+
private static class CollectionWatch {
int coreRefCount = 0;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
index 1b5feb9..74850ea 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/AutoScalingParams.java
@@ -38,6 +38,7 @@ public interface AutoScalingParams {
String AFTER_ACTION = "afterAction";
String TIMEOUT = "timeout";
String REMOVE_LISTENERS = "removeListeners";
+ String ZK_VERSION = "zkVersion";
// commands
String CMD_SET_TRIGGER = "set-trigger";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/java/org/apache/solr/common/util/CommandOperation.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/CommandOperation.java b/solr/solrj/src/java/org/apache/solr/common/util/CommandOperation.java
index 50002f6..dcce396 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/CommandOperation.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/CommandOperation.java
@@ -75,7 +75,7 @@ public class CommandOperation {
//noinspection unchecked
return (Map<String, Object>) commandData;
}
- addError(StrUtils.formatString("The command ''{0}'' should have the values as a json object {key:val} format", name));
+ addError(StrUtils.formatString("The command ''{0}'' should have the values as a json object '{'key:val'}' format but is ''{1}''", name, commandData));
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index 1948534..1e6ae7f 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -34,7 +34,6 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy.Suggester.Hint;
-import org.apache.solr.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -984,7 +983,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
};
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
- "newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
+ "newColl", new AutoScalingConfig((Map<String, Object>)Utils.fromJSONString(autoScaleJson)),
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, 0, 0, null);
assertTrue(locations.stream().allMatch(it -> it.node.equals("127.0.0.1:50096_solr")) );
@@ -1041,7 +1040,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
}
};
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
- "newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
+ "newColl", new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson)),
dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3,0,0, null);
assertTrue(locations.stream().allMatch(it -> ImmutableList.of("node2", "node1", "node3").contains(it.node)) );
}
[2/2] lucene-solr:feature/autoscaling: SOLR-11000: Changes made via
AutoScalingHandler should be atomic.
Posted by ab...@apache.org.
SOLR-11000: Changes made via AutoScalingHandler should be atomic.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/2590a430
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/2590a430
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/2590a430
Branch: refs/heads/feature/autoscaling
Commit: 2590a430f520c433c7ae5204fcf54065ea32e999
Parents: d2849cd
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jul 13 17:52:09 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jul 13 17:52:09 2017 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../src/java/org/apache/solr/cloud/Assign.java | 22 +-
.../apache/solr/cloud/CreateCollectionCmd.java | 4 +-
.../apache/solr/cloud/DeleteCollectionCmd.java | 3 +-
.../org/apache/solr/cloud/ZkController.java | 19 +-
.../solr/cloud/autoscaling/AutoScaling.java | 34 +-
.../cloud/autoscaling/AutoScalingConfig.java | 227 --------
.../cloud/autoscaling/AutoScalingHandler.java | 483 ++++++++---------
.../cloud/autoscaling/ComputePlanAction.java | 7 +-
.../cloud/autoscaling/HttpTriggerListener.java | 4 +-
.../cloud/autoscaling/LogTriggerListener.java | 7 +-
.../cloud/autoscaling/NodeAddedTrigger.java | 19 +-
.../solr/cloud/autoscaling/NodeLostTrigger.java | 19 +-
.../autoscaling/OverseerTriggerThread.java | 23 +-
.../cloud/autoscaling/ScheduledTriggers.java | 46 +-
.../solr/cloud/autoscaling/TriggerEvent.java | 9 +-
.../cloud/autoscaling/TriggerEventQueue.java | 3 +-
.../solr/cloud/autoscaling/TriggerListener.java | 6 +-
.../cloud/autoscaling/TriggerListenerBase.java | 1 +
...verseerCollectionConfigSetProcessorTest.java | 3 +
.../AutoAddReplicasPlanActionTest.java | 4 +-
.../autoscaling/AutoScalingHandlerTest.java | 56 +-
.../autoscaling/ComputePlanActionTest.java | 3 +-
.../autoscaling/ExecutePlanActionTest.java | 3 +-
.../cloud/autoscaling/NodeAddedTriggerTest.java | 2 +-
.../cloud/autoscaling/NodeLostTriggerTest.java | 2 +-
.../autoscaling/TriggerIntegrationTest.java | 45 +-
.../cloud/autoscaling/AutoScalingConfig.java | 523 +++++++++++++++++++
.../client/solrj/cloud/autoscaling/Clause.java | 20 +-
.../client/solrj/cloud/autoscaling/Policy.java | 121 ++++-
.../solrj/cloud/autoscaling/PolicyHelper.java | 5 +-
.../solrj/cloud/autoscaling/Preference.java | 19 +-
.../autoscaling/TriggerEventProcessorStage.java | 30 ++
.../cloud/autoscaling/TriggerEventType.java | 31 ++
.../java/org/apache/solr/common/MapWriter.java | 21 +
.../apache/solr/common/cloud/ZkStateReader.java | 34 ++
.../solr/common/params/AutoScalingParams.java | 1 +
.../solr/common/util/CommandOperation.java | 2 +-
.../solrj/cloud/autoscaling/TestPolicy.java | 5 +-
39 files changed, 1172 insertions(+), 696 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3cb1398..c0f175f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -74,6 +74,8 @@ Optimizations
* SOLR-10985: Remove unnecessary toString() calls in solr-core's search package's debug logging.
(Michael Braun via Christine Poerschke)
+* SOLR-11000: Changes made via AutoScalingHandler should be atomic. (ab, shalin)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index 266dab4..c2fa138 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -34,11 +34,11 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
@@ -63,8 +63,6 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT;
import static org.apache.solr.common.cloud.DocCollection.SNITCH;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
-
public class Assign {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -191,9 +189,9 @@ public class Assign {
int numPullReplicas) throws KeeperException, InterruptedException {
List<Map> rulesMap = (List) message.get("rule");
String policyName = message.getStr(POLICY);
- Map autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
+ AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig();
- if (rulesMap == null && policyName == null && autoScalingJson.get(Policy.CLUSTER_POLICY) == null) {
+ if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
log.debug("Identify nodes using default");
int i = 0;
List<ReplicaPosition> result = new ArrayList<>();
@@ -215,7 +213,7 @@ public class Assign {
}
}
- if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
+ if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
if (message.getStr(CREATE_NODE_SET) == null)
nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
return getPositionsUsingPolicy(collectionName,
@@ -297,10 +295,10 @@ public class Assign {
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cc, coll, createNodeList, l);
}
String policyName = coll.getStr(POLICY);
- Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
- if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
+ AutoScalingConfig autoScalingConfig = cc.getZkController().zkStateReader.getAutoScalingConfig();
+ if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0,
- policyName, cc.getZkController().getZkStateReader(), createNodeList);
+ policyName, cc.getZkController().zkStateReader, createNodeList);
}
if(replicaPositions != null){
@@ -326,15 +324,15 @@ public class Assign {
log.debug("shardnames {} NRT {} TLOG {} PULL {} , policy {}, nodeList {}", shardNames, nrtReplicas, tlogReplicas, pullReplicas, policyName, nodesList);
SolrClientDataProvider clientDataProvider = null;
List<ReplicaPosition> replicaPositions = null;
+ AutoScalingConfig autoScalingConfig = zkStateReader.getAutoScalingConfig();
try (CloudSolrClient csc = new CloudSolrClient.Builder()
.withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
.build()) {
clientDataProvider = new SolrClientDataProvider(csc);
- Map<String, Object> autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
Map<String, String> kvMap = Collections.singletonMap(collName, policyName);
replicaPositions = PolicyHelper.getReplicaLocations(
collName,
- autoScalingJson,
+ autoScalingConfig,
clientDataProvider,
kvMap,
shardNames,
@@ -350,7 +348,7 @@ public class Assign {
if (clientDataProvider != null) log.trace("CLUSTER_DATA_PROVIDER: " + Utils.toJSONString(clientDataProvider));
if (replicaPositions != null)
log.trace("REPLICA_POSITIONS: " + Utils.toJSONString(Utils.getDeepCopy(replicaPositions, 7, true)));
- log.trace("AUTOSCALING_JSON: " + Utils.toJSONString(Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true)));
+ log.trace("AUTOSCALING_CONF: " + Utils.toJSONString(autoScalingConfig));
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 6f1b42c..d859170 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -188,9 +188,9 @@ public class CreateCollectionCmd implements Cmd {
Map newPolicy = Utils.makeMap(REPLICA, "<" + (maxShardsPerNode + 1), SHARD, Policy.EACH, "node", Policy.ANY);
SolrQueryResponse rsp = new SolrQueryResponse();
policy = "COLL_POLICY_" + collectionName;
- ash.handleSetPolicies(null, rsp, new CommandOperation(AutoScalingParams.CMD_SET_POLICY, singletonMap(
+ ash.processOps(null, rsp, Collections.singletonList(new CommandOperation(AutoScalingParams.CMD_SET_POLICY, singletonMap(
policy
- , Collections.singletonList(newPolicy))));
+ , Collections.singletonList(newPolicy)))));
if (!"success".equals(rsp.getValues().get("result"))) {
throw new SolrException(ErrorCode.SERVER_ERROR, "unable to create new policy");
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
index f0ee458..f7b2fce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
@@ -19,6 +19,7 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -129,7 +130,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
.getRequestHandler(AutoScalingHandler.HANDLER_PATH);
SolrQueryResponse rsp = new SolrQueryResponse();
try {
- ash.handleRemovePolicy(null, rsp, new CommandOperation(AutoScalingParams.CMD_REMOVE_POLICY, collectionSpecificPolicy));
+ ash.processOps(null, rsp, Collections.singletonList(new CommandOperation(AutoScalingParams.CMD_REMOVE_POLICY, collectionSpecificPolicy)));
} catch (SolrException e) {
if (e.getMessage().contains("No policy exists with name")) {
log.warn("The policy: " + collectionSpecificPolicy + " does not exist to be removed");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 66fa26a..3b78202 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -53,8 +53,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
-import org.apache.solr.cloud.autoscaling.AutoScaling;
-import org.apache.solr.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
@@ -179,7 +178,7 @@ public class ZkController {
private final SolrZkClient zkClient;
private final ZkCmdExecutor cmdExecutor;
- private final ZkStateReader zkStateReader;
+ public final ZkStateReader zkStateReader;
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
@@ -578,16 +577,6 @@ public class ZkController {
}
/**
- *
- * @return current configuration from <code>autoscaling.json</code>. NOTE:
- * this data is retrieved from ZK on each call.
- */
- public AutoScalingConfig getAutoScalingConfig() throws KeeperException, InterruptedException {
- Map<String, Object> jsonMap = Utils.getJson(zkClient, ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true);
- return new AutoScalingConfig(jsonMap);
- }
-
- /**
* Returns config file data (in bytes)
*/
public byte[] getConfigFileData(String zkConfigName, String fileName)
@@ -839,7 +828,7 @@ public class ZkController {
// then don't create markers
boolean createNodes = false;
try {
- createNodes = getAutoScalingConfig().hasTriggerForEvents(AutoScaling.EventType.NODELOST);
+ createNodes = zkStateReader.getAutoScalingConfig().hasTriggerForEvents(TriggerEventType.NODELOST);
} catch (KeeperException | InterruptedException e1) {
log.warn("Unable to read autoscaling.json", e1);
}
@@ -938,7 +927,7 @@ public class ZkController {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
// if there are nodeAdded triggers don't create nodeAdded markers
- boolean createMarkerNode = getAutoScalingConfig().hasTriggerForEvents(AutoScaling.EventType.NODEADDED);
+ boolean createMarkerNode = zkStateReader.getAutoScalingConfig().hasTriggerForEvents(TriggerEventType.NODEADDED);
if (createMarkerNode && !zkClient.exists(nodeAddedPath, true)) {
// use EPHEMERAL so that it disappears if this node goes down
// and no other action is taken
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index dcf455e..c25b00e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -24,33 +24,15 @@ import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.core.CoreContainer;
public class AutoScaling {
- public enum EventType {
- NODEADDED,
- NODELOST,
- REPLICALOST,
- MANUAL,
- SCHEDULED,
- SEARCHRATE,
- INDEXRATE
- }
-
- public enum EventProcessorStage {
- STARTED,
- ABORTED,
- SUCCEEDED,
- FAILED,
- BEFORE_ACTION,
- AFTER_ACTION
- }
-
/**
* Implementation of this interface is used for processing events generated by a trigger.
*/
- public interface EventProcessor {
+ public interface TriggerEventProcessor {
/**
* This method is executed for events produced by {@link Trigger#run()}.
@@ -65,7 +47,7 @@ public class AutoScaling {
/**
* Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger
* is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as
- * per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setProcessor(EventProcessor)}
+ * per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setProcessor(TriggerEventProcessor)}
* method should be used to set a processor which is used by implementation of this class whenever
* ready.
* <p>
@@ -75,7 +57,7 @@ public class AutoScaling {
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
* should use appropriate synchronization around the listener.
* <p>
- * When a trigger is ready to fire, it calls the {@link EventProcessor#process(TriggerEvent)} event
+ * When a trigger is ready to fire, it calls the {@link TriggerEventProcessor#process(TriggerEvent)} event
* with the proper trigger event object. If that method returns false then it should be interpreted to mean
* that Solr is not ready to process this trigger event and therefore we should retain the state and fire
* at the next invocation of the run() method.
@@ -89,7 +71,7 @@ public class AutoScaling {
/**
* Event type generated by this trigger.
*/
- EventType getEventType();
+ TriggerEventType getEventType();
/** Returns true if this trigger is enabled. */
boolean isEnabled();
@@ -104,10 +86,10 @@ public class AutoScaling {
List<TriggerAction> getActions();
/** Set event processor to call when event is fired. */
- void setProcessor(EventProcessor processor);
+ void setProcessor(TriggerEventProcessor processor);
/** Get event processor. */
- EventProcessor getProcessor();
+ TriggerEventProcessor getProcessor();
/** Return true when this trigger is closed and cannot be used. */
boolean isClosed();
@@ -139,7 +121,7 @@ public class AutoScaling {
this.coreContainer = coreContainer;
}
- public synchronized Trigger create(EventType type, String name, Map<String, Object> props) {
+ public synchronized Trigger create(TriggerEventType type, String name, Map<String, Object> props) {
if (isClosed) {
throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers");
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
deleted file mode 100644
index 54e9170..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
-import org.apache.solr.common.params.AutoScalingParams;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple bean representation of <code>autoscaling.json</code>, which parses data
- * lazily.
- */
-public class AutoScalingConfig {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final Map<String, Object> jsonMap;
-
- private Policy policy;
- private Map<String, TriggerConfig> triggers;
- private Map<String, TriggerListenerConfig> listeners;
-
- /**
- * Bean representation of {@link TriggerListener} config.
- */
- public static class TriggerListenerConfig {
- public final String name;
- public final String trigger;
- public final EnumSet<AutoScaling.EventProcessorStage> stages = EnumSet.noneOf(AutoScaling.EventProcessorStage.class);
- public final String listenerClass;
- public final Set<String> beforeActions;
- public final Set<String> afterActions;
- public final Map<String, Object> properties = new HashMap<>();
-
- public TriggerListenerConfig(String name, Map<String, Object> properties) {
- this.name = name;
- this.properties.putAll(properties);
- trigger = (String)properties.get(AutoScalingParams.TRIGGER);
- List<String> stageNames = getList(AutoScalingParams.STAGE, properties);
- for (String stageName : stageNames) {
- try {
- AutoScaling.EventProcessorStage stage = AutoScaling.EventProcessorStage.valueOf(stageName.toUpperCase(Locale.ROOT));
- stages.add(stage);
- } catch (Exception e) {
- LOG.warn("Invalid stage name '" + name + "' in listener config, skipping: " + properties);
- }
- }
- listenerClass = (String)properties.get(AutoScalingParams.CLASS);
- beforeActions = new HashSet<>(getList(AutoScalingParams.BEFORE_ACTION, properties));
- afterActions = new HashSet<>(getList(AutoScalingParams.AFTER_ACTION, properties));
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TriggerListenerConfig that = (TriggerListenerConfig) o;
-
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
- if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
- if (!stages.equals(that.stages)) return false;
- if (listenerClass != null ? !listenerClass.equals(that.listenerClass) : that.listenerClass != null) return false;
- if (!beforeActions.equals(that.beforeActions)) return false;
- if (!afterActions.equals(that.afterActions)) return false;
- return properties.equals(that.properties);
- }
- }
-
- /**
- * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
- */
- public static class TriggerConfig {
- public final String name;
- public final AutoScaling.EventType eventType;
- public final Map<String, Object> properties = new HashMap<>();
-
- public TriggerConfig(String name, Map<String, Object> properties) {
- this.name = name;
- String event = (String) properties.get(AutoScalingParams.EVENT);
- this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
- this.properties.putAll(properties);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TriggerConfig that = (TriggerConfig) o;
-
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
- if (eventType != that.eventType) return false;
- return properties.equals(that.properties);
- }
- }
-
- public AutoScalingConfig(byte[] utf8) {
- this(utf8 != null && utf8.length > 0 ? (Map<String, Object>)Utils.fromJSON(utf8) : Collections.emptyMap());
- }
-
- /**
- * Construct from a JSON map representation.
- * @param jsonMap JSON map representation of the config.
- */
- public AutoScalingConfig(Map<String, Object> jsonMap) {
- this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
- }
-
- /**
- * Return the original JSON map representation that was used for building this config.
- */
- public Map<String, Object> getJsonMap() {
- return jsonMap;
- }
-
- /**
- * Get {@link Policy} configuration.
- */
- public Policy getPolicy() {
- if (policy == null) {
- policy = new Policy(jsonMap);
- }
- return policy;
- }
-
- /**
- * Get trigger configurations.
- */
- public Map<String, TriggerConfig> getTriggerConfigs() {
- if (triggers == null) {
- Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
- if (trigMap == null) {
- triggers = Collections.emptyMap();
- } else {
- triggers = new HashMap<>(trigMap.size());
- for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
- triggers.put(entry.getKey(), new TriggerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
- }
- }
- }
- return triggers;
- }
-
- /**
- * Check whether triggers for specific event type exist.
- * @param types list of event types
- * @return true if there's at least one trigger matching at least one event type,
- * false otherwise,
- */
- public boolean hasTriggerForEvents(AutoScaling.EventType... types) {
- if (types == null || types.length == 0) {
- return false;
- }
- for (TriggerConfig config : getTriggerConfigs().values()) {
- for (AutoScaling.EventType type : types) {
- if (config.eventType.equals(type)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Get listener configurations.
- */
- public Map<String, TriggerListenerConfig> getTriggerListenerConfigs() {
- if (listeners == null) {
- Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
- if (map == null) {
- listeners = Collections.emptyMap();
- } else {
- listeners = new HashMap<>(map.size());
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- listeners.put(entry.getKey(), new TriggerListenerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
- }
- }
- }
- return listeners;
- }
-
- private static List<String> getList(String key, Map<String, Object> properties) {
- return getList(key, properties, null);
- }
-
- private static List<String> getList(String key, Map<String, Object> properties, List<String> defaultList) {
- if (defaultList == null) {
- defaultList = Collections.emptyList();
- }
- Object o = properties.get(key);
- if (o == null) {
- return defaultList;
- }
- if (o instanceof List) {
- return (List)o;
- } else {
- return Collections.singletonList(String.valueOf(o));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
index ebec3f6..83d8663 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
@@ -30,19 +30,23 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
-import com.google.common.collect.ImmutableSet;
import org.apache.solr.api.Api;
import org.apache.solr.api.ApiBag;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Cell;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.Preference;
import org.apache.solr.client.solrj.cloud.autoscaling.Row;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommandOperation;
import org.apache.solr.common.util.StrUtils;
@@ -57,10 +61,11 @@ import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
import static org.apache.solr.common.params.CommonParams.JSON;
import static org.apache.solr.common.params.AutoScalingParams.*;
@@ -73,7 +78,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final CoreContainer container;
private final List<Map<String, String>> DEFAULT_ACTIONS = new ArrayList<>(3);
- private static ImmutableSet<String> singletonCommands = ImmutableSet.of("set-cluster-preferences", "set-cluster-policy");
+ private static Set<String> singletonCommands = Stream.of("set-cluster-preferences", "set-cluster-policy")
+ .collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
@@ -110,11 +116,18 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown path: " + path);
}
- Map<String, Object> map = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
+ AutoScalingConfig autoScalingConf = container.getZkController().zkStateReader.getAutoScalingConfig();
if (parts.size() == 2) {
- rsp.getValues().addAll(map);
+ autoScalingConf.writeMap(new MapWriter.EntryWriter() {
+
+ @Override
+ public MapWriter.EntryWriter put(String k, Object v) throws IOException {
+ rsp.getValues().add(k, v);
+ return this;
+ }
+ });
} else if (parts.size() == 3 && DIAGNOSTICS.equals(parts.get(2))) {
- handleDiagnostics(rsp, map);
+ handleDiagnostics(rsp, autoScalingConf);
}
} else {
if (req.getContentStreams() == null) {
@@ -125,42 +138,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
// errors have already been added to the response so there's nothing left to do
return;
}
- for (CommandOperation op : ops) {
- switch (op.name) {
- case CMD_SET_TRIGGER:
- handleSetTrigger(req, rsp, op);
- break;
- case CMD_REMOVE_TRIGGER:
- handleRemoveTrigger(req, rsp, op);
- break;
- case CMD_SET_LISTENER:
- handleSetListener(req, rsp, op);
- break;
- case CMD_REMOVE_LISTENER:
- handleRemoveListener(req, rsp, op);
- break;
- case CMD_SUSPEND_TRIGGER:
- handleSuspendTrigger(req, rsp, op);
- break;
- case CMD_RESUME_TRIGGER:
- handleResumeTrigger(req, rsp, op);
- break;
- case CMD_SET_POLICY:
- handleSetPolicies(req, rsp, op);
- break;
- case CMD_REMOVE_POLICY:
- handleRemovePolicy(req, rsp, op);
- break;
- case CMD_SET_CLUSTER_PREFERENCES:
- handleSetClusterPreferences(req, rsp, op);
- break;
- case CMD_SET_CLUSTER_POLICY:
- handleSetClusterPolicy(req, rsp, op);
- break;
- default:
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + op.name);
- }
- }
+ processOps(req, rsp, ops);
}
} catch (Exception e) {
rsp.getValues().add("result", "failure");
@@ -170,8 +148,65 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
}
- private void handleDiagnostics(SolrQueryResponse rsp, Map<String, Object> autoScalingConf) throws IOException {
- Policy policy = new Policy(autoScalingConf);
+ public void processOps(SolrQueryRequest req, SolrQueryResponse rsp, List<CommandOperation> ops) throws KeeperException, InterruptedException, IOException {
+ while (true) {
+ AutoScalingConfig initialConfig = container.getZkController().zkStateReader.getAutoScalingConfig();
+ AutoScalingConfig currentConfig = initialConfig;
+ for (CommandOperation op : ops) {
+ switch (op.name) {
+ case CMD_SET_TRIGGER:
+ currentConfig = handleSetTrigger(req, rsp, op, currentConfig);
+ break;
+ case CMD_REMOVE_TRIGGER:
+ currentConfig = handleRemoveTrigger(req, rsp, op, currentConfig);
+ break;
+ case CMD_SET_LISTENER:
+ currentConfig = handleSetListener(req, rsp, op, currentConfig);
+ break;
+ case CMD_REMOVE_LISTENER:
+ currentConfig = handleRemoveListener(req, rsp, op, currentConfig);
+ break;
+ case CMD_SUSPEND_TRIGGER:
+ currentConfig = handleSuspendTrigger(req, rsp, op, currentConfig);
+ break;
+ case CMD_RESUME_TRIGGER:
+ currentConfig = handleResumeTrigger(req, rsp, op, currentConfig);
+ break;
+ case CMD_SET_POLICY:
+ currentConfig = handleSetPolicies(req, rsp, op, currentConfig);
+ break;
+ case CMD_REMOVE_POLICY:
+ currentConfig = handleRemovePolicy(req, rsp, op, currentConfig);
+ break;
+ case CMD_SET_CLUSTER_PREFERENCES:
+ currentConfig = handleSetClusterPreferences(req, rsp, op, currentConfig);
+ break;
+ case CMD_SET_CLUSTER_POLICY:
+ currentConfig = handleSetClusterPolicy(req, rsp, op, currentConfig);
+ break;
+ default:
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + op.name);
+ }
+ }
+ if (!currentConfig.equals(initialConfig)) {
+ // update in ZK
+ if (zkSetAutoScalingConfig(container.getZkController().getZkStateReader(), currentConfig)) {
+ break;
+ } else {
+ // someone else updated the config, get the latest one and re-apply our ops
+ rsp.getValues().add("retry", "initialVersion=" + initialConfig.getZkVersion());
+ continue;
+ }
+ } else {
+ // no changes
+ break;
+ }
+ }
+ rsp.getValues().add("result", "success");
+ }
+
+ private void handleDiagnostics(SolrQueryResponse rsp, AutoScalingConfig autoScalingConf) throws IOException {
+ Policy policy = autoScalingConf.getPolicy();
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
@@ -204,91 +239,103 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
}
- private void handleSetClusterPolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
- List clusterPolicy = (List) op.getCommandData();
+ private AutoScalingConfig handleSetClusterPolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ List<Map<String, Object>> clusterPolicy = (List<Map<String, Object>>) op.getCommandData();
if (clusterPolicy == null || !(clusterPolicy instanceof List)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A list of cluster policies was not found");
}
- zkSetClusterPolicy(container.getZkController().getZkStateReader(), clusterPolicy);
- rsp.getValues().add("result", "success");
+ List<Clause> cp = clusterPolicy.stream().map(Clause::new).collect(Collectors.toList());
+ Policy p = currentConfig.getPolicy().withClusterPolicy(cp);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- private void handleSetClusterPreferences(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
- List preferences = (List) op.getCommandData();
+ private AutoScalingConfig handleSetClusterPreferences(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ List<Map<String, Object>> preferences = (List<Map<String, Object>>) op.getCommandData();
if (preferences == null || !(preferences instanceof List)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A list of cluster preferences not found");
}
- zkSetPreferences(container.getZkController().getZkStateReader(), preferences);
- rsp.getValues().add("result", "success");
+ List<Preference> prefs = preferences.stream().map(Preference::new).collect(Collectors.toList());
+ Policy p = currentConfig.getPolicy().withClusterPreferences(prefs);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- public void handleRemovePolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op)
- throws KeeperException, InterruptedException, IOException {
+ private AutoScalingConfig handleRemovePolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
String policyName = (String) op.getCommandData();
if (policyName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The policy name cannot be empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> policies = (Map<String, Object>) autoScalingConf.get("policies");
+ Map<String, List<Clause>> policies = currentConfig.getPolicy().getPolicies();
if (policies == null || !policies.containsKey(policyName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No policy exists with name: " + policyName);
}
-
- zkSetPolicies(container.getZkController().getZkStateReader(), policyName, null);
- rsp.getValues().add("result", "success");
+ policies = new HashMap<>(policies);
+ policies.remove(policyName);
+ Policy p = currentConfig.getPolicy().withPolicies(policies);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- public void handleSetPolicies(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
- Map<String, Object> policies = op.getDataMap();
- for (Map.Entry<String, Object> policy : policies.entrySet()) {
+ private AutoScalingConfig handleSetPolicies(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ Map<String, Object> policiesMap = op.getDataMap();
+ for (Map.Entry<String, Object> policy : policiesMap.entrySet()) {
String policyName = policy.getKey();
if (policyName == null || policyName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The policy name cannot be null or empty");
}
}
-
- zkSetPolicies(container.getZkController().getZkStateReader(), null, policies);
-
- rsp.getValues().add("result", "success");
+ List<String> params = new ArrayList<>(currentConfig.getPolicy().getParams());
+ Map<String, List<Clause>> mergedPolicies = new HashMap<>(currentConfig.getPolicy().getPolicies());
+ Map<String, List<Clause>> newPolicies = Policy.policiesFromMap((Map<String, List<Map<String, Object>>>)op.getCommandData(),
+ params);
+ mergedPolicies.putAll(newPolicies);
+ Policy p = currentConfig.getPolicy().withPolicies(mergedPolicies).withParams(params);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- private void handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
+ Map<String, AutoScalingConfig.TriggerConfig> triggers = currentConfig.getTriggerConfigs();
Set<String> changed = new HashSet<>();
- if (triggers == null) {
- if (Policy.EACH.equals(triggerName)) {
- // no harm no foul
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- } else {
- if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- for (Map.Entry<String, Object> entry : triggers.entrySet()) {
- if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
- Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
- Boolean enabled = (Boolean)triggerProps.get(ENABLED);
- if (enabled != null && !enabled) {
- triggerProps.put(ENABLED, true);
- zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
- changed.add(entry.getKey());
- }
+ if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ Map<String, AutoScalingConfig.TriggerConfig> newTriggers = new HashMap<>();
+ for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
+ if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+ AutoScalingConfig.TriggerConfig trigger = entry.getValue();
+ if (!trigger.enabled) {
+ trigger = trigger.withEnabled(true);
+ newTriggers.put(entry.getKey(), trigger);
+ changed.add(entry.getKey());
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
}
rsp.getValues().add("changed", changed);
- rsp.getValues().add("result", "success");
+ if (!changed.isEmpty()) {
+ currentConfig = currentConfig.withTriggerConfigs(newTriggers);
+ }
+ return currentConfig;
}
- private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
@@ -307,55 +354,54 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
+ Map<String, AutoScalingConfig.TriggerConfig> triggers = currentConfig.getTriggerConfigs();
Set<String> changed = new HashSet<>();
- if (triggers == null) {
- if (Policy.EACH.equals(triggerName)) {
- // no harm no foul
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- } else {
- if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- for (Map.Entry<String, Object> entry : triggers.entrySet()) {
- if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
- Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
- Boolean enabled = (Boolean)triggerProps.get(ENABLED);
- if (enabled == null || enabled) {
- triggerProps.put(ENABLED, false);
- if (resumeTime != null) {
- triggerProps.put(RESUME_AT, resumeTime.getTime());
- }
- zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
- changed.add(entry.getKey());
+ if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ Map<String, AutoScalingConfig.TriggerConfig> newTriggers = new HashMap<>();
+ for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
+ if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+ AutoScalingConfig.TriggerConfig trigger = entry.getValue();
+ if (trigger.enabled) {
+ trigger = trigger.withEnabled(false);
+ if (resumeTime != null) {
+ trigger = trigger.withProperty(RESUME_AT, resumeTime.getTime());
}
+ newTriggers.put(entry.getKey(), trigger);
+ changed.add(trigger.name);
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
}
rsp.getValues().add("changed", changed);
- rsp.getValues().add("result", "success");
+ if (!changed.isEmpty()) {
+ currentConfig = currentConfig.withTriggerConfigs(newTriggers);
+ }
+ return currentConfig;
}
- private void handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String listenerName = op.getStr(NAME);
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> listeners = (Map<String, Object>) autoScalingConf.get("listeners");
+ Map<String, AutoScalingConfig.TriggerListenerConfig> listeners = currentConfig.getTriggerListenerConfigs();
if (listeners == null || !listeners.containsKey(listenerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No listener exists with name: " + listenerName);
}
- zkSetListener(container.getZkController().getZkStateReader(), listenerName, null);
- rsp.getValues().add("result", "success");
+ currentConfig = currentConfig.withoutTriggerListenerConfig(listenerName);
+ return currentConfig;
}
- private void handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String listenerName = op.getStr(NAME);
String triggerName = op.getStr(TRIGGER);
List<String> stageNames = op.getStrs(STAGE, Collections.emptyList());
@@ -367,12 +413,11 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
+ Map<String, AutoScalingConfig.TriggerConfig> triggers = currentConfig.getTriggerConfigs();
if (triggers == null || !triggers.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A trigger with the name " + triggerName + " does not exist");
}
- Map<String, Object> triggerProps = (Map<String, Object>) triggers.get(triggerName);
+ AutoScalingConfig.TriggerConfig triggerConfig = triggers.get(triggerName);
if (stageNames.isEmpty() && beforeActions.isEmpty() && afterActions.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Either 'stage' or 'beforeAction' or 'afterAction' must be specified");
@@ -380,7 +425,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
for (String stage : stageNames) {
try {
- AutoScaling.EventProcessorStage.valueOf(stage);
+ TriggerEventProcessorStage.valueOf(stage);
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid stage name: " + stage);
}
@@ -397,47 +442,25 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Listener not found: " + listenerClass, e);
}
- List<Map<String, String>> actions = (List<Map<String, String>>) triggerProps.get("actions");
Set<String> actionNames = new HashSet<>();
actionNames.addAll(beforeActions);
actionNames.addAll(afterActions);
- for (Map<String, String> action : actions) {
- actionNames.remove(action.get(NAME));
+ for (AutoScalingConfig.ActionConfig action : triggerConfig.actions) {
+ actionNames.remove(action.name);
}
if (!actionNames.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger '" + triggerName + "' does not have actions named: " + actionNames);
}
-
+ AutoScalingConfig.TriggerListenerConfig listener = new AutoScalingConfig.TriggerListenerConfig(listenerName, op.getValuesExcluding("name"));
// todo - handle races between competing set-trigger and set-listener invocations
- zkSetListener(container.getZkController().getZkStateReader(), listenerName, op.getValuesExcluding("name"));
- rsp.getValues().add("result", "success");
- }
-
- private void zkSetListener(ZkStateReader reader, String listenerName, Map<String, Object> listenerProperties) throws KeeperException, InterruptedException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- Map<String, Object> listeners = (Map<String, Object>) loaded.get("listeners");
- if (listeners == null) listeners = new HashMap<>(1);
- if (listenerProperties != null) {
- listeners.put(listenerName, listenerProperties);
- } else {
- listeners.remove(listenerName);
- }
- loaded = loaded.plus("listeners", listeners);
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
- }
- break;
- }
+ currentConfig = currentConfig.withTriggerListenerConfig(listener);
+ return currentConfig;
}
- private void handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
+ // we're going to modify the op - use a copy
+ op = new CommandOperation(op.name, Utils.getDeepCopy((Map)op.getCommandData(), 10));
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
@@ -448,7 +471,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (eventTypeStr == null || eventTypeStr.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The event type cannot be null or empty in trigger: " + triggerName);
}
- AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(eventTypeStr.trim().toUpperCase(Locale.ROOT));
+ TriggerEventType eventType = TriggerEventType.valueOf(eventTypeStr.trim().toUpperCase(Locale.ROOT));
String waitForStr = op.getStr(WAIT_FOR, null);
if (waitForStr != null) {
@@ -456,7 +479,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
try {
seconds = parseHumanTime(waitForStr);
} catch (IllegalArgumentException e) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'waitFor' value in trigger: " + triggerName);
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid 'waitFor' value '" + waitForStr + "' in trigger: " + triggerName);
}
op.getDataMap().put(WAIT_FOR, seconds);
}
@@ -483,9 +506,9 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action not found: " + klass, e);
}
}
-
- zkSetTrigger(container.getZkController().getZkStateReader(), triggerName, op.getValuesExcluding("name"));
- rsp.getValues().add("result", "success");
+ AutoScalingConfig.TriggerConfig trigger = new AutoScalingConfig.TriggerConfig(triggerName, op.getValuesExcluding("name"));
+ currentConfig = currentConfig.withTriggerConfig(trigger);
+ return currentConfig;
}
private int parseHumanTime(String timeStr) {
@@ -508,143 +531,61 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
return seconds;
}
- private void handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String triggerName = op.getStr(NAME);
boolean removeListeners = op.getBoolean(REMOVE_LISTENERS, false);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
- if (triggers == null || !triggers.containsKey(triggerName)) {
+ Map<String, AutoScalingConfig.TriggerConfig> triggerConfigs = currentConfig.getTriggerConfigs();
+ if (!triggerConfigs.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
-
- Map<String, Map<String, Object>> listeners = (Map<String, Map<String, Object>>) autoScalingConf.get("listeners");
+ triggerConfigs = new HashMap<>(triggerConfigs);
Set<String> activeListeners = new HashSet<>();
- if (listeners != null) {
- for (Map.Entry<String, Map<String, Object>> entry : listeners.entrySet()) {
- Map<String, Object> listenerProps = entry.getValue();
- if (triggerName.equals(listenerProps.get(TRIGGER)) && !removeListeners) {
- activeListeners.add(entry.getKey());
- }
- }
- }
- if (removeListeners) {
- for (String activeListener : activeListeners) {
- zkSetListener(container.getZkController().getZkStateReader(), activeListener, null);
- }
- } else if (!activeListeners.isEmpty()) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Cannot remove trigger: " + triggerName + " because it has active listeners: " + activeListeners);
- }
-
- zkSetTrigger(container.getZkController().getZkStateReader(), triggerName, null);
- rsp.getValues().add("result", "success");
- }
-
- private void zkSetTrigger(ZkStateReader reader, String triggerName, Map<String, Object> triggerProperties) throws KeeperException, InterruptedException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- Map<String, Object> triggers = (Map<String, Object>) loaded.get("triggers");
- if (triggers == null) triggers = new HashMap<>(1);
- if (triggerProperties != null) {
- triggers.put(triggerName, triggerProperties);
- } else {
- triggers.remove(triggerName);
- }
- loaded = loaded.plus("triggers", triggers);
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
+ Map<String, AutoScalingConfig.TriggerListenerConfig> listeners = currentConfig.getTriggerListenerConfigs();
+ for (AutoScalingConfig.TriggerListenerConfig listener : listeners.values()) {
+ if (triggerName.equals(listener.trigger)) {
+ activeListeners.add(listener.name);
}
- break;
}
- }
-
- private void zkSetPolicies(ZkStateReader reader, String policyBeRemoved, Map<String, Object> newPolicies) throws KeeperException, InterruptedException, IOException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- Map<String, Object> policies = (Map<String, Object>) loaded.get("policies");
- if (policies == null) policies = new HashMap<>(1);
- if (newPolicies != null) {
- policies.putAll(newPolicies);
+ if (!activeListeners.isEmpty()) {
+ if (removeListeners) {
+ listeners = new HashMap<>(listeners);
+ listeners.keySet().removeAll(activeListeners);
} else {
- policies.remove(policyBeRemoved);
- }
- loaded = loaded.plus("policies", policies);
- verifyAutoScalingConf(loaded.getProperties());
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Cannot remove trigger: " + triggerName + " because it has active listeners: " + activeListeners);
}
- break;
}
+ triggerConfigs.remove(triggerName);
+ currentConfig = currentConfig.withTriggerConfigs(triggerConfigs).withTriggerListenerConfigs(listeners);
+ return currentConfig;
}
- private void zkSetPreferences(ZkStateReader reader, List preferences) throws KeeperException, InterruptedException, IOException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- loaded = loaded.plus("cluster-preferences", preferences);
- verifyAutoScalingConf(loaded.getProperties());
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
- }
- break;
- }
- }
- private void zkSetClusterPolicy(ZkStateReader reader, List clusterPolicy) throws KeeperException, InterruptedException, IOException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- loaded = loaded.plus("cluster-policy", clusterPolicy);
- verifyAutoScalingConf(loaded.getProperties());
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
- }
- break;
+ private boolean zkSetAutoScalingConfig(ZkStateReader reader, AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ verifyAutoScalingConf(currentConfig);
+ try {
+ reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(currentConfig), currentConfig.getZkVersion(), true);
+ } catch (KeeperException.BadVersionException bve) {
+ // somebody else has changed the configuration so we must retry
+ return false;
}
+ return true;
}
- private void verifyAutoScalingConf(Map<String, Object> autoScalingConf) throws IOException {
+ private void verifyAutoScalingConf(AutoScalingConfig autoScalingConf) throws IOException {
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
- Policy policy = new Policy(autoScalingConf);
- Policy.Session session = policy.createSession(new SolrClientDataProvider(build));
+ Policy.Session session = autoScalingConf.getPolicy().createSession(new SolrClientDataProvider(build));
log.debug("Verified autoscaling configuration");
}
}
- private Map<String, Object> zkReadAutoScalingConf(ZkStateReader reader) throws KeeperException, InterruptedException {
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
- ZkNodeProps loaded = ZkNodeProps.load(data);
- return loaded.getProperties();
- }
-
@Override
public String getDescription() {
return "A handler for autoscaling configuration";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index cfd9ca3..473762e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -24,12 +24,12 @@ import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -55,13 +55,12 @@ public class ComputePlanAction extends TriggerActionBase {
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.build()) {
ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
- Map<String, Object> autoScalingConf = Utils.getJson(zkStateReader.getZkClient(), ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true);
+ AutoScalingConfig autoScalingConf = zkStateReader.getAutoScalingConfig();
if (autoScalingConf.isEmpty()) {
log.error("Action: " + getName() + " executed but no policy is configured");
return;
}
- AutoScalingConfig config = new AutoScalingConfig(autoScalingConf);
- Policy policy = config.getPolicy();
+ Policy policy = autoScalingConf.getPolicy();
Policy.Session session = policy.createSession(new SolrClientDataProvider(cloudSolrClient));
Policy.Suggester suggester = getSuggester(session, event, zkStateReader);
while (true) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
index e50417f..2af4f30 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
@@ -29,6 +29,8 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
@@ -86,7 +88,7 @@ public class HttpTriggerListener extends TriggerListenerBase {
}
@Override
- public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
Properties properties = new Properties();
properties.setProperty("stage", stage.toString());
// if configuration used "actionName" but we're in a non-action related stage then PropertiesUtil will
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
index 108f41b..736d946 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
@@ -17,10 +17,9 @@
package org.apache.solr.cloud.autoscaling;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import org.apache.solr.core.CoreContainer;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +31,8 @@ public class LogTriggerListener extends TriggerListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
- public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
- Throwable error, String message) {
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context,
+ Throwable error, String message) {
LOG.info("{}: stage={}, actionName={}, event={}, error={}, messsage={}", config.name, stage, actionName, event, error, message);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index c51e586..2197dd0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
@@ -43,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Trigger for the {@link org.apache.solr.cloud.autoscaling.AutoScaling.EventType#NODEADDED} event
+ * Trigger for the {@link TriggerEventType#NODEADDED} event
*/
public class NodeAddedTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -52,10 +53,10 @@ public class NodeAddedTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
- private final AtomicReference<AutoScaling.EventProcessor> processorRef;
+ private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
- private final AutoScaling.EventType eventType;
+ private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
@@ -86,7 +87,7 @@ public class NodeAddedTrigger extends TriggerBase {
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
- this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
+ this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
}
@@ -119,12 +120,12 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
- public void setProcessor(AutoScaling.EventProcessor processor) {
+ public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
- public AutoScaling.EventProcessor getProcessor() {
+ public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@@ -134,7 +135,7 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
- public AutoScaling.EventType getEventType() {
+ public TriggerEventType getEventType() {
return eventType;
}
@@ -254,7 +255,7 @@ public class NodeAddedTrigger extends TriggerBase {
long now = timeSource.getTime();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
- AutoScaling.EventProcessor processor = processorRef.get();
+ AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor != null) {
log.debug("NodeAddedTrigger {} firing registered processor for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
if (processor.process(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
@@ -297,7 +298,7 @@ public class NodeAddedTrigger extends TriggerBase {
public static class NodeAddedEvent extends TriggerEvent {
- public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedTime, String nodeAdded) {
+ public NodeAddedEvent(TriggerEventType eventType, String source, long nodeAddedTime, String nodeAdded) {
super(eventType, source, nodeAddedTime, Collections.singletonMap(NODE_NAME, nodeAdded));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index caf051b..fba1f3c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
@@ -43,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Trigger for the {@link AutoScaling.EventType#NODELOST} event
+ * Trigger for the {@link TriggerEventType#NODELOST} event
*/
public class NodeLostTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -52,10 +53,10 @@ public class NodeLostTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
- private final AtomicReference<AutoScaling.EventProcessor> processorRef;
+ private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
- private final AutoScaling.EventType eventType;
+ private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
@@ -86,7 +87,7 @@ public class NodeLostTrigger extends TriggerBase {
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
- this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
+ this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
}
@Override
@@ -117,12 +118,12 @@ public class NodeLostTrigger extends TriggerBase {
}
@Override
- public void setProcessor(AutoScaling.EventProcessor processor) {
+ public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
- public AutoScaling.EventProcessor getProcessor() {
+ public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@@ -132,7 +133,7 @@ public class NodeLostTrigger extends TriggerBase {
}
@Override
- public AutoScaling.EventType getEventType() {
+ public TriggerEventType getEventType() {
return eventType;
}
@@ -249,7 +250,7 @@ public class NodeLostTrigger extends TriggerBase {
Long timeRemoved = entry.getValue();
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
- AutoScaling.EventProcessor processor = processorRef.get();
+ AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor != null) {
log.debug("NodeLostTrigger firing registered processor for lost node: {}", nodeName);
if (processor.process(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
@@ -292,7 +293,7 @@ public class NodeLostTrigger extends TriggerBase {
public static class NodeLostEvent extends TriggerEvent {
- public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostTime, String nodeRemoved) {
+ public NodeLostEvent(TriggerEventType eventType, String source, long nodeLostTime, String nodeRemoved) {
super(eventType, source, nodeLostTime, Collections.singletonMap(NODE_NAME, nodeRemoved));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2590a430/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 3666e1b..ca9bb2e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -23,21 +23,19 @@ import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -182,10 +180,10 @@ public class OverseerTriggerThread implements Runnable, Closeable {
boolean cleanOldNodeAddedMarkers = true;
// add new triggers and/or replace and close the replaced triggers
for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
- if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODELOST)) {
+ if (entry.getValue().getEventType().equals(TriggerEventType.NODELOST)) {
cleanOldNodeLostMarkers = false;
}
- if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODEADDED)) {
+ if (entry.getValue().getEventType().equals(TriggerEventType.NODEADDED)) {
cleanOldNodeAddedMarkers = false;
}
scheduledTriggers.add(entry.getValue());
@@ -264,15 +262,14 @@ public class OverseerTriggerThread implements Runnable, Closeable {
if (isClosed) {
return;
}
- final Stat stat = new Stat();
- final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, watcher, stat, true);
- log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, stat.getVersion());
- if (znodeVersion >= stat.getVersion()) {
+ AutoScalingConfig currentConfig = zkStateReader.getAutoScalingConfig(watcher);
+ log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion());
+ if (znodeVersion >= currentConfig.getZkVersion()) {
// protect against reordered watcher fires by ensuring that we only move forward
return;
}
- autoScalingConfig = new AutoScalingConfig(data);
- znodeVersion = stat.getVersion();
+ autoScalingConfig = currentConfig;
+ znodeVersion = autoScalingConfig.getZkVersion();
Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, autoScalingConfig);
// remove all active triggers that have been removed from ZK
@@ -305,7 +302,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
AutoScalingConfig.TriggerConfig cfg = entry.getValue();
- AutoScaling.EventType eventType = cfg.eventType;
+ TriggerEventType eventType = cfg.event;
String triggerName = entry.getKey();
triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, cfg.properties));
}