You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/23 18:00:16 UTC

[2/8] lucene-solr:jira/solr-10515: SOLR-10714: OverseerTriggerThread does not start triggers on overseer start until autoscaling config watcher is fired

SOLR-10714: OverseerTriggerThread does not start triggers on overseer start until autoscaling config watcher is fired


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c15a1737
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c15a1737
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c15a1737

Branch: refs/heads/jira/solr-10515
Commit: c15a17379ff7daea0ab1cb8e89e67745e82e68fc
Parents: ab7257f
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri May 19 17:51:18 2017 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri May 19 17:51:18 2017 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../solr/cloud/autoscaling/AutoScaling.java     |   2 +
 .../autoscaling/OverseerTriggerThread.java      | 160 +++++++++++--------
 .../autoscaling/TriggerIntegrationTest.java     |  56 ++++++-
 4 files changed, 149 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 211ed14..03f2f39 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -101,6 +101,9 @@ Bug Fixes
 
 * SOLR-10602: Triggers should be able to restore state from old instances when taking over. (shalin)
 
+* SOLR-10714: OverseerTriggerThread does not start triggers on overseer start until autoscaling
+  config watcher is fired. (shalin)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index cd08ea9..5c992b0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.solr.core.CoreContainer;
 
@@ -123,6 +124,7 @@ public class AutoScaling {
     private boolean isClosed = false;
 
     public TriggerFactory(CoreContainer coreContainer) {
+      Preconditions.checkNotNull(coreContainer);
       this.coreContainer = coreContainer;
     }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index eca8c0b..cd619c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -54,6 +54,8 @@ public class OverseerTriggerThread implements Runnable, Closeable {
 
   private final ZkStateReader zkStateReader;
 
+  private final SolrZkClient zkClient;
+
   private final ScheduledTriggers scheduledTriggers;
 
   private final AutoScaling.TriggerFactory triggerFactory;
@@ -74,6 +76,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
   public OverseerTriggerThread(ZkController zkController) {
     this.zkController = zkController;
     zkStateReader = zkController.getZkStateReader();
+    zkClient = zkController.getZkClient();
     scheduledTriggers = new ScheduledTriggers();
     triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
   }
@@ -90,13 +93,27 @@ public class OverseerTriggerThread implements Runnable, Closeable {
     }
     IOUtils.closeQuietly(triggerFactory);
     IOUtils.closeQuietly(scheduledTriggers);
+    log.debug("OverseerTriggerThread has been closed explicitly");
   }
 
   @Override
   public void run() {
     int lastZnodeVersion = znodeVersion;
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    createWatcher(zkClient);
+
+    try {
+      refreshAutoScalingConf(new AutoScalingWatcher());
+    } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
+      log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
+    } catch (KeeperException e) {
+      log.error("A ZK error has occurred", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+    } catch (InterruptedException e) {
+      // Restore the interrupted status
+      Thread.currentThread().interrupt();
+      log.warn("Interrupted", e);
+    } catch (Exception e)  {
+      log.error("Unexpected exception", e);
+    }
 
     while (true) {
       Map<String, AutoScaling.Trigger> copy = null;
@@ -104,18 +121,31 @@ public class OverseerTriggerThread implements Runnable, Closeable {
         // this can throw InterruptedException and we don't want to unlock if it did, so we keep this outside
         // of the try/finally block
         updateLock.lockInterruptibly();
+
+        // must check for close here before we await on the condition otherwise we can only be woken up on interruption
+        if (isClosed) {
+          log.warn("OverseerTriggerThread has been closed, exiting.");
+          break;
+        }
+
+        log.debug("Current znodeVersion {}, lastZnodeVersion {}", znodeVersion, lastZnodeVersion);
+
         try {
           if (znodeVersion == lastZnodeVersion) {
             updated.await();
 
             // are we closed?
-            if (isClosed) break;
+            if (isClosed) {
+              log.warn("OverseerTriggerThread woken up but we are closed, exiting.");
+              break;
+            }
 
             // spurious wakeup?
             if (znodeVersion == lastZnodeVersion) continue;
-            lastZnodeVersion = znodeVersion;
           }
           copy = new HashMap<>(activeTriggers);
+          lastZnodeVersion = znodeVersion;
+          log.debug("Processed trigger updates upto znodeVersion {}", znodeVersion);
         } catch (InterruptedException e) {
           // Restore the interrupted status
           Thread.currentThread().interrupt();
@@ -145,77 +175,65 @@ public class OverseerTriggerThread implements Runnable, Closeable {
     }
   }
 
-  private void createWatcher(SolrZkClient zkClient) {
+  class AutoScalingWatcher implements Watcher  {
+    @Override
+    public void process(WatchedEvent watchedEvent) {
+      // session events are not change events, and do not remove the watcher
+      if (Event.EventType.None.equals(watchedEvent.getType())) {
+        return;
+      }
+
+      try {
+        refreshAutoScalingConf(this);
+      } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
+        log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
+      } catch (KeeperException e) {
+        log.error("A ZK error has occurred", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("Interrupted", e);
+      } catch (Exception e)  {
+        log.error("Unexpected exception", e);
+      }
+    }
+
+  }
+
+  private void refreshAutoScalingConf(Watcher watcher) throws KeeperException, InterruptedException {
+    updateLock.lock();
     try {
-      zkClient.exists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, new Watcher() {
-        @Override
-        public void process(WatchedEvent watchedEvent) {
-          // session events are not change events, and do not remove the watcher
-          if (Event.EventType.None.equals(watchedEvent.getType())) {
-            return;
-          }
+      if (isClosed) {
+        return;
+      }
+      final Stat stat = new Stat();
+      final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, watcher, stat, true);
+      log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, stat.getVersion());
+      if (znodeVersion >= stat.getVersion()) {
+        // protect against reordered watcher fires by ensuring that we only move forward
+        return;
+      }
+      znodeVersion = stat.getVersion();
+      Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
 
-          try {
-            refreshAndWatch();
-          } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
-            log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
-          } catch (KeeperException e) {
-            log.error("A ZK error has occurred", e);
-            throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
-          } catch (InterruptedException e) {
-            // Restore the interrupted status
-            Thread.currentThread().interrupt();
-            log.warn("Interrupted", e);
-          } catch (Exception e)  {
-            log.error("Unexpected exception", e);
-          }
-        }
+      // remove all active triggers that have been removed from ZK
+      Set<String> trackingKeySet = activeTriggers.keySet();
+      trackingKeySet.retainAll(triggerMap.keySet());
 
-        private void refreshAndWatch() throws KeeperException, InterruptedException {
-          updateLock.lock();
-          try {
-            if (isClosed) {
-              return;
-            }
-            final Stat stat = new Stat();
-            final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, this, stat, true);
-            log.debug("{} watcher fired with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, stat.getVersion());
-            if (znodeVersion >= stat.getVersion()) {
-              // protect against reordered watcher fires by ensuring that we only move forward
-              return;
-            }
-            znodeVersion = stat.getVersion();
-            Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
-
-            // remove all active triggers that have been removed from ZK
-            Set<String> trackingKeySet = activeTriggers.keySet();
-            trackingKeySet.retainAll(triggerMap.keySet());
-
-            // now lets add or remove triggers which have been enabled or disabled respectively
-            for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
-              String triggerName = entry.getKey();
-              AutoScaling.Trigger trigger = entry.getValue();
-              if (trigger.isEnabled()) {
-                activeTriggers.put(triggerName, trigger);
-              } else {
-                activeTriggers.remove(triggerName);
-              }
-            }
-            updated.signalAll();
-          } finally {
-            updateLock.unlock();
-          }
+      // now lets add or remove triggers which have been enabled or disabled respectively
+      for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
+        String triggerName = entry.getKey();
+        AutoScaling.Trigger trigger = entry.getValue();
+        if (trigger.isEnabled()) {
+          activeTriggers.put(triggerName, trigger);
+        } else {
+          activeTriggers.remove(triggerName);
         }
-      }, true);
-    } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
-      log.error("OverseerTriggerThread could not talk to ZooKeeper", e);
-    } catch (KeeperException e) {
-      log.error("Exception in OverseerTriggerThread", e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-      log.error("OverseerTriggerThread interrupted", e);
+      }
+      updated.signalAll();
+    } finally {
+      updateLock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 545c0d6..0ac8183 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.util.NamedList;
@@ -73,7 +74,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
   }
 
   @Before
-  public void setupTest() throws KeeperException, InterruptedException, IOException, SolrServerException {
+  public void setupTest() throws Exception {
     waitForSeconds = 1 + random().nextInt(3);
     actionCreated = new CountDownLatch(1);
     triggerFiredLatch = new CountDownLatch(1);
@@ -85,6 +86,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     // todo nocommit -- add testing for the v2 path
     // String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
     this.path = "/admin/autoscaling";
+    while (cluster.getJettySolrRunners().size() < 2) {
+      // perhaps a test stopped a node but didn't start it back
+      // lets start a node
+      cluster.startJettySolrRunner();
+    }
   }
 
   @Test
@@ -416,6 +422,54 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
         lostNodeName, nodeLostEvent.getNodeName());
   }
 
+  @Test
+  public void testContinueTriggersOnOverseerRestart() throws Exception  {
+    CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    CollectionAdminResponse adminResponse = status.process(solrClient);
+    NamedList<Object> response = adminResponse.getResponse();
+    String leader = (String) response.get("leader");
+    JettySolrRunner overseerNode = null;
+    int index = -1;
+    List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+    for (int i = 0; i < jettySolrRunners.size(); i++) {
+      JettySolrRunner runner = jettySolrRunners.get(i);
+      if (runner.getNodeName().equals(leader)) {
+        overseerNode = runner;
+        index = i;
+        break;
+      }
+    }
+    assertNotNull(overseerNode);
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+    if (!actionCreated.await(3, TimeUnit.SECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    // stop the overseer, somebody else will take over as the overseer
+    cluster.stopJettySolrRunner(index);
+
+    JettySolrRunner newNode = cluster.startJettySolrRunner();
+    boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+    assertTrue("The trigger did not fire at all", await);
+    assertTrue(triggerFired.get());
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+    assertNotNull(nodeAddedEvent);
+    assertEquals("The node added trigger was fired but for a different node",
+        newNode.getNodeName(), nodeAddedEvent.getNodeName());
+  }
+
   public static class TestTriggerAction implements TriggerAction {
 
     public TestTriggerAction() {