You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/29 20:00:16 UTC

[1/2] lucene-solr:jira/solr-10745: SOLR-10745 Improved node registration and de-registration. Modify nodeAdded and nodeLost triggers to use the new information.

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-10745 [created] 3e05eefc9


SOLR-10745 Improved node registration and de-registration. Modify
nodeAdded and nodeLost triggers to use the new information.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/08e00045
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/08e00045
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/08e00045

Branch: refs/heads/jira/solr-10745
Commit: 08e00045a54185606cb438b30e9c1a5860ed8fcd
Parents: f0054a3
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon May 29 21:58:41 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon May 29 21:58:41 2017 +0200

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ZkController.java     |  82 +++++--
 .../cloud/autoscaling/NodeAddedTrigger.java     |  20 +-
 .../solr/cloud/autoscaling/NodeLostTrigger.java |  20 +-
 .../cloud/autoscaling/ScheduledTriggers.java    |  10 +-
 .../autoscaling/TriggerIntegrationTest.java     | 236 ++++++++++++++++++-
 .../solr/common/cloud/LiveNodesListener.java    |  38 +++
 .../apache/solr/common/cloud/ZkStateReader.java |  32 ++-
 7 files changed, 392 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index dc7605a..a9004b8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -53,25 +53,7 @@ import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DefaultConnectionStrategy;
-import org.apache.solr.common.cloud.DefaultZkACLProvider;
-import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.OnReconnect;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkACLProvider;
-import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkCredentialsProvider;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.SolrParams;
@@ -339,7 +321,7 @@ public class ZkController {
               registerAllCoresAsDown(registerOnReconnect, false);
 
               // we have to register as live first to pick up docs in the buffer
-              createEphemeralLiveNode();
+              createLiveEphemeralNode();
 
               List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
               // re register all descriptors
@@ -666,6 +648,8 @@ public class ZkController {
     cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, zkClient);
+    cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, zkClient);
+    cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, zkClient);
     byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
     cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
@@ -680,6 +664,7 @@ public class ZkController {
       this.baseURL = zkStateReader.getBaseUrlForNodeName(this.nodeName);
 
       checkForExistingEphemeralNode();
+      registerLiveNodesListener();
 
       // start the overseer first as following code may need it's processing
       if (!zkRunOnly) {
@@ -698,7 +683,7 @@ public class ZkController {
       }
 
       // Do this last to signal we're up.
-      createEphemeralLiveNode();
+      createLiveEphemeralNode();
     } catch (IOException e) {
       log.error("", e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -750,6 +735,49 @@ public class ZkController {
     }
   }
 
+  private void registerLiveNodesListener() {
+    // this listener is used for generating nodeLost events, so we check only if
+    // some nodes went missing compared to last state
+    LiveNodesListener listener = (oldNodes, newNodes) -> {
+      oldNodes.removeAll(newNodes);
+      if (oldNodes.isEmpty()) { // only added nodes
+        log.debug("-- skip, only new nodes: " + newNodes);
+        return;
+      }
+      if (isClosed) {
+        log.debug("-- skip, closed: old=" + oldNodes + ", new=" + newNodes);
+        return;
+      }
+      // if this node is in the top three then attempt to create nodeLost message
+      int i = 0;
+      for (String n : newNodes) {
+        if (n.equals(getNodeName())) {
+          break;
+        }
+        if (i > 2) {
+          log.debug("-- skip, " + getNodeName() + " not in the top 3 of " + newNodes);
+          return; // this node is not in the top three
+        }
+        i++;
+      }
+      for (String n : oldNodes) {
+        String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
+        try {
+          // nocommit decide between EPHEMERAL vs. PERSISTENT, the latter needs
+          // explicit cleanup on cluster restart if there are no nodeLost triggers
+          zkClient.create(path, null, CreateMode.PERSISTENT, true);
+          log.debug("-- created " + path);
+        } catch (KeeperException.NodeExistsException e) {
+          // someone else already created this node - ignore
+          log.debug("-- skip, already exists " + path);
+        } catch (KeeperException | InterruptedException e1) {
+          log.warn("Unable to register nodeLost path for " + n, e1);
+        }
+      }
+    };
+    zkStateReader.registerLiveNodesListener(listener);
+  }
+
   public void publishAndWaitForDownStates() throws KeeperException,
       InterruptedException {
 
@@ -817,15 +845,23 @@ public class ZkController {
     return zkClient.isConnected();
   }
 
-  private void createEphemeralLiveNode() throws KeeperException,
+  private void createLiveEphemeralNode() throws KeeperException,
       InterruptedException {
     if (zkRunOnly) {
       return;
     }
     String nodeName = getNodeName();
     String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
+    String nodeAddedPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:" + nodePath);
-    zkClient.makePath(nodePath, CreateMode.EPHEMERAL, true);
+    List<Op> ops = new ArrayList<>(2);
+    ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
+    if (!zkClient.exists(nodeAddedPath, true)) {
+      // nocommit use EPHEMERAL or PERSISTENT?
+      // EPHEMERAL will disappear if this node shuts down, PERSISTENT will need an explicit cleanup
+      ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
+    }
+    zkClient.multi(ops, true);
   }
 
   public String getNodeName() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 642ae56..f304ba7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -29,6 +29,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -37,6 +38,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +62,7 @@ public class NodeAddedTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
+  private Map<String, Long> nodeNameVsTimeAdded = new ConcurrentHashMap<>();
 
   public NodeAddedTrigger(String name, Map<String, Object> properties,
                           CoreContainer container) {
@@ -227,6 +229,22 @@ public class NodeAddedTrigger extends TriggerBase {
         log.debug("Tracking new node: {} at time {}", n, eventTime);
       });
 
+      // pick up added nodes for which marker paths were created
+      try {
+        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+        lost.forEach(n -> {
+          log.debug("Adding node from marker path: {}", n);
+          nodeNameVsTimeAdded.put(n, timeSource.getTime());
+          try {
+            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n, -1, true);
+          } catch (KeeperException | InterruptedException e) {
+            log.debug("Exception removing nodeAdded marker " + n, e);
+          }
+        });
+      } catch (KeeperException | InterruptedException e) {
+        log.warn("Exception retrieving nodeLost markers", e);
+      }
+
       // has enough time expired to trigger events for a node?
       for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
         String nodeName = entry.getKey();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 76c59db..6450bda 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -29,6 +29,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -37,6 +38,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,7 +62,7 @@ public class NodeLostTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
+  private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
 
   public NodeLostTrigger(String name, Map<String, Object> properties,
                          CoreContainer container) {
@@ -225,6 +227,22 @@ public class NodeLostTrigger extends TriggerBase {
         nodeNameVsTimeRemoved.put(n, timeSource.getTime());
       });
 
+      // pick up lost nodes for which marker paths were created
+      try {
+        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+        lost.forEach(n -> {
+          log.debug("Adding lost node from marker path: {}", n);
+          nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+          try {
+            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n, -1, true);
+          } catch (KeeperException | InterruptedException e) {
+            log.warn("Exception removing nodeLost marker " + n, e);
+          }
+        });
+      } catch (KeeperException | InterruptedException e) {
+        log.warn("Exception retrieving nodeLost markers", e);
+      }
+
       // has enough time expired to trigger events for a node?
       for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
         String nodeName = entry.getKey();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 6180b35..d595d4d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud.autoscaling;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -42,6 +43,7 @@ import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -208,7 +210,13 @@ public class ScheduledTriggers implements Closeable {
     }
     try {
       if (zkClient.exists(eventsPath, true)) {
-        zkClient.delete(eventsPath, -1, true);
+        List<String> events = zkClient.getChildren(eventsPath, null, true);
+        List<Op> ops = new ArrayList<>(events.size() + 1);
+        events.forEach(ev -> {
+          ops.add(Op.delete(eventsPath + "/" + ev, -1));
+        });
+        ops.add(Op.delete(eventsPath, -1));
+        zkClient.multi(ops, true);
       }
     } catch (KeeperException | InterruptedException e) {
       log.warn("Failed to remove events for removed trigger " + eventsPath, e);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 1c189ae..b2c95b7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -19,13 +19,16 @@ package org.apache.solr.cloud.autoscaling;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.solr.client.solrj.SolrRequest;
@@ -34,12 +37,16 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.LiveNodesListener;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TimeOut;
 import org.apache.solr.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -65,7 +72,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
   private static CountDownLatch actionInterrupted;
   private static CountDownLatch actionCompleted;
   private static AtomicBoolean triggerFired;
-  private static AtomicReference<TriggerEvent> eventRef;
+  private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+  private static ZkStateReader zkStateReader;
 
   private String path;
 
@@ -77,6 +85,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     configureCluster(2)
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
+    zkStateReader = cluster.getSolrClient().getZkStateReader();
   }
 
   private static CountDownLatch getTriggerFiredLatch() {
@@ -105,10 +114,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     actionStarted = new CountDownLatch(1);
     actionInterrupted = new CountDownLatch(1);
     actionCompleted = new CountDownLatch(1);
-    eventRef = new AtomicReference<>();
+    events.clear();
     // clear any persisted auto scaling configuration
     Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
     log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+    // clear any events or markers
+    // todo: consider the impact of such cleanup on regular cluster restarts
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+    deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
     // todo nocommit -- add testing for the v2 path
     // String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
     this.path = "/admin/autoscaling";
@@ -119,6 +134,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     }
   }
 
+  private void deleteChildrenRecursively(String path) throws Exception {
+    List<String> paths = zkClient().getChildren(path, null, true);
+    paths.forEach(n -> {
+      try {
+        ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
+      } catch (KeeperException | InterruptedException e) {
+        log.warn("Error deleting old data", e);
+      }
+    });
+  }
+
   @Test
   public void testTriggerThrottling() throws Exception  {
     // for this test we want to create two triggers so we must assert that the actions were created twice
@@ -316,7 +342,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
-    NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
+    NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
     assertNotNull(nodeLostEvent);
     assertEquals("The node added trigger was fired but for a different node",
         nodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
@@ -374,7 +400,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
-    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
     assertNotNull(nodeAddedEvent);
     assertEquals("The node added trigger was fired but for a different node",
         newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
@@ -403,7 +429,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
-    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
     assertNotNull(nodeAddedEvent);
     assertEquals("The node added trigger was fired but for a different node",
         newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
@@ -466,7 +492,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
-    NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
+    NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
     assertNotNull(nodeLostEvent);
     assertEquals("The node lost trigger was fired but for a different node",
         lostNodeName, nodeLostEvent.getProperty(TriggerEvent.NODE_NAME));
@@ -538,7 +564,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
-    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
     assertNotNull(nodeAddedEvent);
     assertEquals("The node added trigger was fired but for a different node",
         newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
@@ -564,7 +590,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     public void process(TriggerEvent event) {
       try {
         if (triggerFired.compareAndSet(false, true))  {
-          eventRef.set(event);
+          events.add(event);
           if (TimeUnit.MILLISECONDS.convert(timeSource.getTime() - event.getEventTime(), TimeUnit.NANOSECONDS) <= TimeUnit.MILLISECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
             fail("NodeAddedListener was fired before the configured waitFor period");
           }
@@ -608,7 +634,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public void process(TriggerEvent event) {
-      eventRef.set(event);
+      events.add(event);
       getActionStarted().countDown();
       try {
         Thread.sleep(5000);
@@ -666,7 +692,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     boolean await = actionStarted.await(60, TimeUnit.SECONDS);
     assertTrue("action did not start", await);
     // event should be there
-    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
     assertNotNull(nodeAddedEvent);
     // but action did not complete yet so the event is still enqueued
     assertFalse(triggerFired.get());
@@ -680,7 +706,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     newNode = cluster.startJettySolrRunner();
     // it should fire again but not complete yet
     await = actionStarted.await(60, TimeUnit.SECONDS);
-    TriggerEvent replayedEvent = eventRef.get();
+    TriggerEvent replayedEvent = events.iterator().next();
     assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
     assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
     await = actionCompleted.await(10, TimeUnit.SECONDS);
@@ -724,7 +750,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     // reset
     triggerFired.set(false);
     triggerFiredLatch = new CountDownLatch(1);
-    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
     assertNotNull(nodeAddedEvent);
     assertEquals("The node added trigger was fired but for a different node",
         newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
@@ -737,4 +763,188 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     assertTrue("The trigger did not fire at all", await);
     assertTrue(triggerFired.get());
   }
+
+  private static class TestLiveNodesListener implements LiveNodesListener {
+    Set<String> lostNodes = new HashSet<>();
+    Set<String> addedNodes = new HashSet<>();
+    CountDownLatch onChangeLatch = new CountDownLatch(1);
+
+    public void reset() {
+      lostNodes.clear();
+      addedNodes.clear();
+      onChangeLatch = new CountDownLatch(1);
+    }
+
+    @Override
+    public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+      onChangeLatch.countDown();
+      Set<String> old = new HashSet<>(oldLiveNodes);
+      old.removeAll(newLiveNodes);
+      if (!old.isEmpty()) {
+        lostNodes.addAll(old);
+      }
+      newLiveNodes.removeAll(oldLiveNodes);
+      if (!newLiveNodes.isEmpty()) {
+        addedNodes.addAll(newLiveNodes);
+      }
+    }
+  }
+
+  private TestLiveNodesListener registerLiveNodesListener() {
+    TestLiveNodesListener listener = new TestLiveNodesListener();
+    zkStateReader.registerLiveNodesListener(listener);
+    return listener;
+  }
+
+  public static class TestEventMarkerAction implements TriggerAction {
+    // sanity check that an action instance is only invoked once
+    private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
+
+    public TestEventMarkerAction() {
+      actionConstructorCalled.countDown();
+    }
+
+    @Override
+    public String getName() {
+      return "TestTriggerAction";
+    }
+
+    @Override
+    public String getClassName() {
+      return this.getClass().getName();
+    }
+
+    @Override
+    public void process(TriggerEvent event) {
+      boolean locked = lock.tryLock();
+      if (!locked)  {
+        log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+        return;
+      }
+      try {
+        events.add(event);
+        getTriggerFiredLatch().countDown();
+      } catch (Throwable t) {
+        log.debug("--throwable", t);
+        throw t;
+      } finally {
+        if (locked) {
+          lock.unlock();
+        }
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+      log.info("TestTriggerAction init");
+      actionInitCalled.countDown();
+    }
+  }
+
+  @Test
+  public void testNodeEventsRegistration() throws Exception {
+    // for this test we want to create two triggers so we must assert that the actions were created twice
+    actionInitCalled = new CountDownLatch(2);
+    // similarly we want both triggers to fire
+    triggerFiredLatch = new CountDownLatch(3);
+    TestLiveNodesListener listener = registerLiveNodesListener();
+
+    NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+    String overseerLeader = (String) overSeerStatus.get("leader");
+    int overseerLeaderIndex = 0;
+    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+      if (jetty.getNodeName().equals(overseerLeader)) {
+        overseerLeaderIndex = i;
+        break;
+      }
+    }
+    // add a node
+    JettySolrRunner node = cluster.startJettySolrRunner();
+    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+    assertEquals(1, listener.addedNodes.size());
+    assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
+    // verify that a znode exists
+    String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
+    assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+    listener.reset();
+    // stop overseer, which should also cause nodeLost event
+    cluster.stopJettySolrRunner(overseerLeaderIndex);
+    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+    assertEquals(1, listener.lostNodes.size());
+    assertEquals(overseerLeader, listener.lostNodes.iterator().next());
+    assertEquals(0, listener.addedNodes.size());
+    // verify that a znode exists
+    String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
+    assertTrue("Path " + pathLost + " wasn't created", zkClient().exists(pathLost, true));
+
+    listener.reset();
+    // create another node
+    JettySolrRunner node1 = cluster.startJettySolrRunner();
+    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+    assertEquals(1, listener.addedNodes.size());
+    assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
+    // verify that a znode exists
+    pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
+    assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+
+    // set up triggers
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '1s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '1s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+        "}}";
+    req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+      fail("Both triggers should have fired by now");
+    }
+    assertEquals(3, events.size());
+    // 2 nodeAdded, 1 nodeLost
+    int nodeAdded = 0;
+    int nodeLost = 0;
+    for (TriggerEvent ev : events) {
+      String nodeName = (String)ev.getProperty(TriggerEvent.NODE_NAME);
+      if (ev.eventType.equals(AutoScaling.EventType.NODELOST)) {
+        assertEquals(overseerLeader, nodeName);
+        nodeLost++;
+      } else if (ev.eventType.equals(AutoScaling.EventType.NODEADDED)) {
+        assertTrue(nodeName + " not one of: " + node.getNodeName() + ", " + node1.getNodeName(),
+            nodeName.equals(node.getNodeName()) || nodeName.equals(node1.getNodeName()));
+        nodeAdded++;
+      }
+    }
+    assertEquals(1, nodeLost);
+    assertEquals(2, nodeAdded);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
new file mode 100644
index 0000000..1cf16e1
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.util.SortedSet;
+
+/**
+ * Listener that can be used with {@link ZkStateReader#registerLiveNodesListener(LiveNodesListener)}
+ * and called whenever the live nodes set changes.
+ */
+public interface LiveNodesListener {
+
+  /**
+   * Called when a change in the live nodes set occurs.
+   *
+   * Note that, due to the way Zookeeper watchers are implemented, a single call may be
+   * the result of several state changes
+   *
+   * @param oldLiveNodes set of live nodes before the change
+   * @param newLiveNodes set of live nodes after the change
+   */
+  void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/08e00045/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index f23b0b5..afb4acf 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -59,6 +60,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.EMPTY_MAP;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
+import static java.util.Collections.emptySortedSet;
 import static java.util.Collections.unmodifiableSet;
 import static org.apache.solr.common.util.Utils.fromJSON;
 
@@ -94,6 +96,8 @@ public class ZkStateReader implements Closeable {
   public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
   public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events";
   public static final String SOLR_AUTOSCALING_TRIGGER_STATE_PATH = "/autoscaling/triggerState";
+  public static final String SOLR_AUTOSCALING_NODE_ADDED_PATH = "/autoscaling/nodeAdded";
+  public static final String SOLR_AUTOSCALING_NODE_LOST_PATH = "/autoscaling/nodeLost";
 
   public static final String REPLICATION_FACTOR = "replicationFactor";
   public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
@@ -134,7 +138,7 @@ public class ZkStateReader implements Closeable {
   /** Collections with format2 state.json, not "interesting" and not actively watched. */
   private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
 
-  private volatile Set<String> liveNodes = emptySet();
+  private volatile SortedSet<String> liveNodes = emptySortedSet();
 
   private volatile Map<String, Object> clusterProperties = Collections.emptyMap();
 
@@ -159,6 +163,8 @@ public class ZkStateReader implements Closeable {
 
   }
 
+  private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
+
   public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
       LEGACY_CLOUD,
       URL_SCHEME,
@@ -656,25 +662,25 @@ public class ZkStateReader implements Closeable {
   // We don't get a Stat or track versions on getChildren() calls, so force linearization.
   private final Object refreshLiveNodesLock = new Object();
   // Ensures that only the latest getChildren fetch gets applied.
-  private final AtomicReference<Set<String>> lastFetchedLiveNodes = new AtomicReference<>();
+  private final AtomicReference<SortedSet<String>> lastFetchedLiveNodes = new AtomicReference<>();
 
   /**
    * Refresh live_nodes.
    */
   private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
     synchronized (refreshLiveNodesLock) {
-      Set<String> newLiveNodes;
+      SortedSet<String> newLiveNodes;
       try {
         List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
-        newLiveNodes = new HashSet<>(nodeList);
+        newLiveNodes = new TreeSet<>(nodeList);
       } catch (KeeperException.NoNodeException e) {
-        newLiveNodes = emptySet();
+        newLiveNodes = emptySortedSet();
       }
       lastFetchedLiveNodes.set(newLiveNodes);
     }
 
     // Can't lock getUpdateLock() until we release the other, it would cause deadlock.
-    Set<String> oldLiveNodes, newLiveNodes;
+    SortedSet<String> oldLiveNodes, newLiveNodes;
     synchronized (getUpdateLock()) {
       newLiveNodes = lastFetchedLiveNodes.getAndSet(null);
       if (newLiveNodes == null) {
@@ -692,10 +698,22 @@ public class ZkStateReader implements Closeable {
       LOG.info("Updated live nodes from ZooKeeper... ({}) -> ({})", oldLiveNodes.size(), newLiveNodes.size());
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Updated live nodes from ZooKeeper... {} -> {}", new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes));
+      LOG.debug("Updated live nodes from ZooKeeper... {} -> {}", oldLiveNodes, newLiveNodes);
+    }
+    if (!oldLiveNodes.equals(newLiveNodes)) { // fire listeners
+      liveNodesListeners.forEach(listener ->
+          listener.onChange(new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes)));
     }
   }
 
+  public void registerLiveNodesListener(LiveNodesListener listener) {
+    liveNodesListeners.add(listener);
+  }
+
+  public void removeLiveNodesListener(LiveNodesListener listener) {
+    liveNodesListeners.remove(listener);
+  }
+
   /**
    * @return information about the cluster from ZooKeeper
    */


[2/2] lucene-solr:jira/solr-10745: Merge branch 'feature/autoscaling' into jira/solr-10745

Posted by ab...@apache.org.
Merge branch 'feature/autoscaling' into jira/solr-10745


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3e05eefc
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3e05eefc
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3e05eefc

Branch: refs/heads/jira/solr-10745
Commit: 3e05eefc952dfdc637aa6403d4b5cedd1cf54d89
Parents: 08e0004 cae6b6e
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon May 29 22:00:01 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon May 29 22:00:01 2017 +0200

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../cloud/autoscaling/AutoScalingHandler.java   | 56 ++++++++++---
 .../resources/apispec/autoscaling.Commands.json |  3 +-
 .../autoscaling/AutoScalingHandlerTest.java     | 71 ++++++++---------
 .../cloud/autoscaling/NodeAddedTriggerTest.java | 22 +++++-
 .../cloud/autoscaling/NodeLostTriggerTest.java  | 23 ++++--
 .../solr/cloud/autoscaling/TestPolicyCloud.java | 38 ++++++++-
 .../solrj/impl/SolrClientDataProvider.java      | 76 ++++++++++++------
 .../cloud/autoscaling/AddReplicaSuggester.java  |  2 +-
 .../org/apache/solr/cloud/autoscaling/Cell.java |  8 +-
 .../apache/solr/cloud/autoscaling/Clause.java   | 83 ++++++++++++--------
 .../cloud/autoscaling/ClusterDataProvider.java  |  2 +-
 .../cloud/autoscaling/MoveReplicaSuggester.java | 36 ++++++---
 .../apache/solr/cloud/autoscaling/Policy.java   | 77 ++++++++++--------
 .../solr/cloud/autoscaling/PolicyHelper.java    |  8 +-
 .../solr/cloud/autoscaling/Preference.java      | 14 +++-
 .../org/apache/solr/cloud/autoscaling/Row.java  | 26 +++---
 .../apache/solr/common/cloud/SolrZkClient.java  |  7 +-
 .../solr/common/cloud/rule/ImplicitSnitch.java  |  7 +-
 .../apache/solr/common/params/CommonParams.java |  2 +
 .../solr/cloud/autoscaling/TestPolicy.java      | 37 ++++++---
 21 files changed, 396 insertions(+), 204 deletions(-)
----------------------------------------------------------------------