You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/29 20:00:16 UTC
[1/2] lucene-solr:jira/solr-10745: SOLR-10745 Improved node
registration and de-registration. Modify nodeAdded and nodeLost triggers to
use the new information.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-10745 [created] 3e05eefc9
SOLR-10745 Improved node registration and de-registration. Modify
nodeAdded and nodeLost triggers to use the new information.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/08e00045
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/08e00045
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/08e00045
Branch: refs/heads/jira/solr-10745
Commit: 08e00045a54185606cb438b30e9c1a5860ed8fcd
Parents: f0054a3
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon May 29 21:58:41 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon May 29 21:58:41 2017 +0200
----------------------------------------------------------------------
.../org/apache/solr/cloud/ZkController.java | 82 +++++--
.../cloud/autoscaling/NodeAddedTrigger.java | 20 +-
.../solr/cloud/autoscaling/NodeLostTrigger.java | 20 +-
.../cloud/autoscaling/ScheduledTriggers.java | 10 +-
.../autoscaling/TriggerIntegrationTest.java | 236 ++++++++++++++++++-
.../solr/common/cloud/LiveNodesListener.java | 38 +++
.../apache/solr/common/cloud/ZkStateReader.java | 32 ++-
7 files changed, 392 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index dc7605a..a9004b8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -53,25 +53,7 @@ import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DefaultConnectionStrategy;
-import org.apache.solr.common.cloud.DefaultZkACLProvider;
-import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.OnReconnect;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkACLProvider;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkCredentialsProvider;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.cloud.*;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
@@ -339,7 +321,7 @@ public class ZkController {
registerAllCoresAsDown(registerOnReconnect, false);
// we have to register as live first to pick up docs in the buffer
- createEphemeralLiveNode();
+ createLiveEphemeralNode();
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
// re register all descriptors
@@ -666,6 +648,8 @@ public class ZkController {
cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, zkClient);
+ cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, zkClient);
+ cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, zkClient);
byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
@@ -680,6 +664,7 @@ public class ZkController {
this.baseURL = zkStateReader.getBaseUrlForNodeName(this.nodeName);
checkForExistingEphemeralNode();
+ registerLiveNodesListener();
// start the overseer first as following code may need it's processing
if (!zkRunOnly) {
@@ -698,7 +683,7 @@ public class ZkController {
}
// Do this last to signal we're up.
- createEphemeralLiveNode();
+ createLiveEphemeralNode();
} catch (IOException e) {
log.error("", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -750,6 +735,49 @@ public class ZkController {
}
}
+ private void registerLiveNodesListener() {
+ // this listener is used for generating nodeLost events, so we check only if
+ // some nodes went missing compared to last state
+ LiveNodesListener listener = (oldNodes, newNodes) -> {
+ oldNodes.removeAll(newNodes);
+ if (oldNodes.isEmpty()) { // only added nodes
+ log.debug("-- skip, only new nodes: " + newNodes);
+ return;
+ }
+ if (isClosed) {
+ log.debug("-- skip, closed: old=" + oldNodes + ", new=" + newNodes);
+ return;
+ }
+ // if this node is in the top three then attempt to create nodeLost message
+ int i = 0;
+ for (String n : newNodes) {
+ if (n.equals(getNodeName())) {
+ break;
+ }
+ if (i > 2) {
+ log.debug("-- skip, " + getNodeName() + " not in the top 3 of " + newNodes);
+ return; // this node is not in the top three
+ }
+ i++;
+ }
+ for (String n : oldNodes) {
+ String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
+ try {
+ // nocommit decide between EPHEMERAL vs. PERSISTENT, the latter needs
+ // explicit cleanup on cluster restart if there are no nodeLost triggers
+ zkClient.create(path, null, CreateMode.PERSISTENT, true);
+ log.debug("-- created " + path);
+ } catch (KeeperException.NodeExistsException e) {
+ // someone else already created this node - ignore
+ log.debug("-- skip, already exists " + path);
+ } catch (KeeperException | InterruptedException e1) {
+ log.warn("Unable to register nodeLost path for " + n, e1);
+ }
+ }
+ };
+ zkStateReader.registerLiveNodesListener(listener);
+ }
+
public void publishAndWaitForDownStates() throws KeeperException,
InterruptedException {
@@ -817,15 +845,23 @@ public class ZkController {
return zkClient.isConnected();
}
- private void createEphemeralLiveNode() throws KeeperException,
+ private void createLiveEphemeralNode() throws KeeperException,
InterruptedException {
if (zkRunOnly) {
return;
}
String nodeName = getNodeName();
String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+ String nodeAddedPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
log.info("Register node as live in ZooKeeper:" + nodePath);
- zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
+ List<Op> ops = new ArrayList<>(2);
+ ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
+ if (!zkClient.exists(nodeAddedPath, true)) {
+ // nocommit use EPHEMERAL or PERSISTENT?
+ // EPHEMERAL will disappear if this node shuts down, PERSISTENT will need an explicit cleanup
+ ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
+ }
+ zkClient.multi(ops, true);
}
public String getNodeName() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 642ae56..f304ba7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -29,6 +29,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -37,6 +38,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +62,7 @@ public class NodeAddedTrigger extends TriggerBase {
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
+ private Map<String, Long> nodeNameVsTimeAdded = new ConcurrentHashMap<>();
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -227,6 +229,22 @@ public class NodeAddedTrigger extends TriggerBase {
log.debug("Tracking new node: {} at time {}", n, eventTime);
});
+ // pick up added nodes for which marker paths were created
+ try {
+ List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+ lost.forEach(n -> {
+ log.debug("Adding node from marker path: {}", n);
+ nodeNameVsTimeAdded.put(n, timeSource.getTime());
+ try {
+ container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n, -1, true);
+ } catch (KeeperException | InterruptedException e) {
+ log.debug("Exception removing nodeAdded marker " + n, e);
+ }
+ });
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception retrieving nodeLost markers", e);
+ }
+
// has enough time expired to trigger events for a node?
for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
String nodeName = entry.getKey();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 76c59db..6450bda 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -29,6 +29,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -37,6 +38,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,7 +62,7 @@ public class NodeLostTrigger extends TriggerBase {
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
+ private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
public NodeLostTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -225,6 +227,22 @@ public class NodeLostTrigger extends TriggerBase {
nodeNameVsTimeRemoved.put(n, timeSource.getTime());
});
+ // pick up lost nodes for which marker paths were created
+ try {
+ List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+ lost.forEach(n -> {
+ log.debug("Adding lost node from marker path: {}", n);
+ nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+ try {
+ container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n, -1, true);
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception removing nodeLost marker " + n, e);
+ }
+ });
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception retrieving nodeLost markers", e);
+ }
+
// has enough time expired to trigger events for a node?
for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
String nodeName = entry.getKey();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 6180b35..d595d4d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,7 +210,13 @@ public class ScheduledTriggers implements Closeable {
}
try {
if (zkClient.exists(eventsPath, true)) {
- zkClient.delete(eventsPath, -1, true);
+ List<String> events = zkClient.getChildren(eventsPath, null, true);
+ List<Op> ops = new ArrayList<>(events.size() + 1);
+ events.forEach(ev -> {
+ ops.add(Op.delete(eventsPath + "/" + ev, -1));
+ });
+ ops.add(Op.delete(eventsPath, -1));
+ zkClient.multi(ops, true);
}
} catch (KeeperException | InterruptedException e) {
log.warn("Failed to remove events for removed trigger " + eventsPath, e);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 1c189ae..b2c95b7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -19,13 +19,16 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.SolrRequest;
@@ -34,12 +37,16 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -65,7 +72,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
private static CountDownLatch actionInterrupted;
private static CountDownLatch actionCompleted;
private static AtomicBoolean triggerFired;
- private static AtomicReference<TriggerEvent> eventRef;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static ZkStateReader zkStateReader;
private String path;
@@ -77,6 +85,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
+ zkStateReader = cluster.getSolrClient().getZkStateReader();
}
private static CountDownLatch getTriggerFiredLatch() {
@@ -105,10 +114,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
actionStarted = new CountDownLatch(1);
actionInterrupted = new CountDownLatch(1);
actionCompleted = new CountDownLatch(1);
- eventRef = new AtomicReference<>();
+ events.clear();
// clear any persisted auto scaling configuration
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+ // clear any events or markers
+ // todo: consider the impact of such cleanup on regular cluster restarts
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
// todo nocommit -- add testing for the v2 path
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
this.path = "/admin/autoscaling";
@@ -119,6 +134,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
+ private void deleteChildrenRecursively(String path) throws Exception {
+ List<String> paths = zkClient().getChildren(path, null, true);
+ paths.forEach(n -> {
+ try {
+ ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Error deleting old data", e);
+ }
+ });
+ }
+
@Test
public void testTriggerThrottling() throws Exception {
// for this test we want to create two triggers so we must assert that the actions were created twice
@@ -316,7 +342,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
- NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
assertNotNull(nodeLostEvent);
assertEquals("The node added trigger was fired but for a different node",
nodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
@@ -374,7 +400,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
@@ -403,7 +429,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
@@ -466,7 +492,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
- NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
assertNotNull(nodeLostEvent);
assertEquals("The node lost trigger was fired but for a different node",
lostNodeName, nodeLostEvent.getProperty(TriggerEvent.NODE_NAME));
@@ -538,7 +564,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
@@ -564,7 +590,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
public void process(TriggerEvent event) {
try {
if (triggerFired.compareAndSet(false, true)) {
- eventRef.set(event);
+ events.add(event);
if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
fail("NodeAddedListener was fired before the configured waitFor period");
}
@@ -608,7 +634,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public void process(TriggerEvent event) {
- eventRef.set(event);
+ events.add(event);
getActionStarted().countDown();
try {
Thread.sleep(5000);
@@ -666,7 +692,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
boolean await = actionStarted.await(60, TimeUnit.SECONDS);
assertTrue("action did not start", await);
// event should be there
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
assertNotNull(nodeAddedEvent);
// but action did not complete yet so the event is still enqueued
assertFalse(triggerFired.get());
@@ -680,7 +706,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
newNode = cluster.startJettySolrRunner();
// it should fire again but not complete yet
await = actionStarted.await(60, TimeUnit.SECONDS);
- TriggerEvent replayedEvent = eventRef.get();
+ TriggerEvent replayedEvent = events.iterator().next();
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
await = actionCompleted.await(10, TimeUnit.SECONDS);
@@ -724,7 +750,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// reset
triggerFired.set(false);
triggerFiredLatch = new CountDownLatch(1);
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
@@ -737,4 +763,188 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
}
+
+ private static class TestLiveNodesListener implements LiveNodesListener {
+ Set<String> lostNodes = new HashSet<>();
+ Set<String> addedNodes = new HashSet<>();
+ CountDownLatch onChangeLatch = new CountDownLatch(1);
+
+ public void reset() {
+ lostNodes.clear();
+ addedNodes.clear();
+ onChangeLatch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+ onChangeLatch.countDown();
+ Set<String> old = new HashSet<>(oldLiveNodes);
+ old.removeAll(newLiveNodes);
+ if (!old.isEmpty()) {
+ lostNodes.addAll(old);
+ }
+ newLiveNodes.removeAll(oldLiveNodes);
+ if (!newLiveNodes.isEmpty()) {
+ addedNodes.addAll(newLiveNodes);
+ }
+ }
+ }
+
+ private TestLiveNodesListener registerLiveNodesListener() {
+ TestLiveNodesListener listener = new TestLiveNodesListener();
+ zkStateReader.registerLiveNodesListener(listener);
+ return listener;
+ }
+
+ public static class TestEventMarkerAction implements TriggerAction {
+ // sanity check that an action instance is only invoked once
+ private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
+
+ public TestEventMarkerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public String getName() {
+ return "TestTriggerAction";
+ }
+
+ @Override
+ public String getClassName() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public void process(TriggerEvent event) {
+ boolean locked = lock.tryLock();
+ if (!locked) {
+ log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+ return;
+ }
+ try {
+ events.add(event);
+ getTriggerFiredLatch().countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ } finally {
+ if (locked) {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ }
+ }
+
+ @Test
+ public void testNodeEventsRegistration() throws Exception {
+ // for this test we want to create two triggers so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+ // similarly we want both triggers to fire
+ triggerFiredLatch = new CountDownLatch(3);
+ TestLiveNodesListener listener = registerLiveNodesListener();
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ // add a node
+ JettySolrRunner node = cluster.startJettySolrRunner();
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
+ // verify that a znode exists
+ String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
+ assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+ listener.reset();
+ // stop overseer, which should also cause nodeLost event
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.lostNodes.size());
+ assertEquals(overseerLeader, listener.lostNodes.iterator().next());
+ assertEquals(0, listener.addedNodes.size());
+ // verify that a znode exists
+ String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
+ assertTrue("Path " + pathLost + " wasn't created", zkClient().exists(pathLost, true));
+
+ listener.reset();
+ // create another node
+ JettySolrRunner node1 = cluster.startJettySolrRunner();
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
+ // verify that a znode exists
+ pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
+ assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+
+ // set up triggers
+ CloudSolrClient solrClient = cluster.getSolrClient();
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+ fail("Both triggers should have fired by now");
+ }
+ assertEquals(3, events.size());
+ // 2 nodeAdded, 1 nodeLost
+ int nodeAdded = 0;
+ int nodeLost = 0;
+ for (TriggerEvent ev : events) {
+ String nodeName = (String)ev.getProperty(TriggerEvent.NODE_NAME);
+ if (ev.eventType.equals(AutoScaling.EventType.NODELOST)) {
+ assertEquals(overseerLeader, nodeName);
+ nodeLost++;
+ } else if (ev.eventType.equals(AutoScaling.EventType.NODEADDED)) {
+ assertTrue(nodeName + " not one of: " + node.getNodeName() + ", " + node1.getNodeName(),
+ nodeName.equals(node.getNodeName()) || nodeName.equals(node1.getNodeName()));
+ nodeAdded++;
+ }
+ }
+ assertEquals(1, nodeLost);
+ assertEquals(2, nodeAdded);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
new file mode 100644
index 0000000..1cf16e1
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.util.SortedSet;
+
+/**
+ * Listener that can be used with {@link ZkStateReader#registerLiveNodesListener(LiveNodesListener)}
+ * and called whenever the live nodes set changes.
+ */
+public interface LiveNodesListener {
+
+ /**
+ * Called when a change in the live nodes set occurs.
+ *
+ * Note that, due to the way Zookeeper watchers are implemented, a single call may be
+ * the result of several state changes
+ *
+ * @param oldLiveNodes set of live nodes before the change
+ * @param newLiveNodes set of live nodes after the change
+ */
+ void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index f23b0b5..afb4acf 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -59,6 +60,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.EMPTY_MAP;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
+import static java.util.Collections.emptySortedSet;
import static java.util.Collections.unmodifiableSet;
import static org.apache.solr.common.util.Utils.fromJSON;
@@ -94,6 +96,8 @@ public class ZkStateReader implements Closeable {
public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events";
public static final String SOLR_AUTOSCALING_TRIGGER_STATE_PATH = "/autoscaling/triggerState";
+ public static final String SOLR_AUTOSCALING_NODE_ADDED_PATH = "/autoscaling/nodeAdded";
+ public static final String SOLR_AUTOSCALING_NODE_LOST_PATH = "/autoscaling/nodeLost";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
@@ -134,7 +138,7 @@ public class ZkStateReader implements Closeable {
/** Collections with format2 state.json, not "interesting" and not actively watched. */
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
- private volatile Set<String> liveNodes = emptySet();
+ private volatile SortedSet<String> liveNodes = emptySortedSet();
private volatile Map<String, Object> clusterProperties = Collections.emptyMap();
@@ -159,6 +163,8 @@ public class ZkStateReader implements Closeable {
}
+ private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
+
public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
LEGACY_CLOUD,
URL_SCHEME,
@@ -656,25 +662,25 @@ public class ZkStateReader implements Closeable {
// We don't get a Stat or track versions on getChildren() calls, so force linearization.
private final Object refreshLiveNodesLock = new Object();
// Ensures that only the latest getChildren fetch gets applied.
- private final AtomicReference<Set<String>> lastFetchedLiveNodes = new AtomicReference<>();
+ private final AtomicReference<SortedSet<String>> lastFetchedLiveNodes = new AtomicReference<>();
/**
* Refresh live_nodes.
*/
private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
synchronized (refreshLiveNodesLock) {
- Set<String> newLiveNodes;
+ SortedSet<String> newLiveNodes;
try {
List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
- newLiveNodes = new HashSet<>(nodeList);
+ newLiveNodes = new TreeSet<>(nodeList);
} catch (KeeperException.NoNodeException e) {
- newLiveNodes = emptySet();
+ newLiveNodes = emptySortedSet();
}
lastFetchedLiveNodes.set(newLiveNodes);
}
// Can't lock getUpdateLock() until we release the other, it would cause deadlock.
- Set<String> oldLiveNodes, newLiveNodes;
+ SortedSet<String> oldLiveNodes, newLiveNodes;
synchronized (getUpdateLock()) {
newLiveNodes = lastFetchedLiveNodes.getAndSet(null);
if (newLiveNodes == null) {
@@ -692,10 +698,22 @@ public class ZkStateReader implements Closeable {
LOG.info("Updated live nodes from ZooKeeper... ({}) -> ({})", oldLiveNodes.size(), newLiveNodes.size());
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Updated live nodes from ZooKeeper... {} -> {}", new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes));
+ LOG.debug("Updated live nodes from ZooKeeper... {} -> {}", oldLiveNodes, newLiveNodes);
+ }
+ if (!oldLiveNodes.equals(newLiveNodes)) { // fire listeners
+ liveNodesListeners.forEach(listener ->
+ listener.onChange(new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes)));
}
}
+ public void registerLiveNodesListener(LiveNodesListener listener) {
+ liveNodesListeners.add(listener);
+ }
+
+ public void removeLiveNodesListener(LiveNodesListener listener) {
+ liveNodesListeners.remove(listener);
+ }
+
/**
* @return information about the cluster from ZooKeeper
*/
[2/2] lucene-solr:jira/solr-10745: Merge branch 'feature/autoscaling'
into jira/solr-10745
Posted by ab...@apache.org.
Merge branch 'feature/autoscaling' into jira/solr-10745
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3e05eefc
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3e05eefc
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3e05eefc
Branch: refs/heads/jira/solr-10745
Commit: 3e05eefc952dfdc637aa6403d4b5cedd1cf54d89
Parents: 08e0004 cae6b6e
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon May 29 22:00:01 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon May 29 22:00:01 2017 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../cloud/autoscaling/AutoScalingHandler.java | 56 ++++++++++---
.../resources/apispec/autoscaling.Commands.json | 3 +-
.../autoscaling/AutoScalingHandlerTest.java | 71 ++++++++---------
.../cloud/autoscaling/NodeAddedTriggerTest.java | 22 +++++-
.../cloud/autoscaling/NodeLostTriggerTest.java | 23 ++++--
.../solr/cloud/autoscaling/TestPolicyCloud.java | 38 ++++++++-
.../solrj/impl/SolrClientDataProvider.java | 76 ++++++++++++------
.../cloud/autoscaling/AddReplicaSuggester.java | 2 +-
.../org/apache/solr/cloud/autoscaling/Cell.java | 8 +-
.../apache/solr/cloud/autoscaling/Clause.java | 83 ++++++++++++--------
.../cloud/autoscaling/ClusterDataProvider.java | 2 +-
.../cloud/autoscaling/MoveReplicaSuggester.java | 36 ++++++---
.../apache/solr/cloud/autoscaling/Policy.java | 77 ++++++++++--------
.../solr/cloud/autoscaling/PolicyHelper.java | 8 +-
.../solr/cloud/autoscaling/Preference.java | 14 +++-
.../org/apache/solr/cloud/autoscaling/Row.java | 26 +++---
.../apache/solr/common/cloud/SolrZkClient.java | 7 +-
.../solr/common/cloud/rule/ImplicitSnitch.java | 7 +-
.../apache/solr/common/params/CommonParams.java | 2 +
.../solr/cloud/autoscaling/TestPolicy.java | 37 ++++++---
21 files changed, 396 insertions(+), 204 deletions(-)
----------------------------------------------------------------------