You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/23 18:00:16 UTC
[2/8] lucene-solr:jira/solr-10515: SOLR-10714: OverseerTriggerThread
does not start triggers on overseer start until autoscaling config watcher is
fired
SOLR-10714: OverseerTriggerThread does not start triggers on overseer start until autoscaling config watcher is fired
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c15a1737
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c15a1737
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c15a1737
Branch: refs/heads/jira/solr-10515
Commit: c15a17379ff7daea0ab1cb8e89e67745e82e68fc
Parents: ab7257f
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri May 19 17:51:18 2017 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri May 19 17:51:18 2017 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../solr/cloud/autoscaling/AutoScaling.java | 2 +
.../autoscaling/OverseerTriggerThread.java | 160 +++++++++++--------
.../autoscaling/TriggerIntegrationTest.java | 56 ++++++-
4 files changed, 149 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 211ed14..03f2f39 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -101,6 +101,9 @@ Bug Fixes
* SOLR-10602: Triggers should be able to restore state from old instances when taking over. (shalin)
+* SOLR-10714: OverseerTriggerThread does not start triggers on overseer start until autoscaling
+ config watcher is fired. (shalin)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index cd08ea9..5c992b0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.core.CoreContainer;
@@ -123,6 +124,7 @@ public class AutoScaling {
private boolean isClosed = false;
public TriggerFactory(CoreContainer coreContainer) {
+ Preconditions.checkNotNull(coreContainer);
this.coreContainer = coreContainer;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index eca8c0b..cd619c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -54,6 +54,8 @@ public class OverseerTriggerThread implements Runnable, Closeable {
private final ZkStateReader zkStateReader;
+ private final SolrZkClient zkClient;
+
private final ScheduledTriggers scheduledTriggers;
private final AutoScaling.TriggerFactory triggerFactory;
@@ -74,6 +76,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
public OverseerTriggerThread(ZkController zkController) {
this.zkController = zkController;
zkStateReader = zkController.getZkStateReader();
+ zkClient = zkController.getZkClient();
scheduledTriggers = new ScheduledTriggers();
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
}
@@ -90,13 +93,27 @@ public class OverseerTriggerThread implements Runnable, Closeable {
}
IOUtils.closeQuietly(triggerFactory);
IOUtils.closeQuietly(scheduledTriggers);
+ log.debug("OverseerTriggerThread has been closed explicitly");
}
@Override
public void run() {
int lastZnodeVersion = znodeVersion;
- SolrZkClient zkClient = zkStateReader.getZkClient();
- createWatcher(zkClient);
+
+ try {
+ refreshAutoScalingConf(new AutoScalingWatcher());
+ } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
+ log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
+ } catch (KeeperException e) {
+ log.error("A ZK error has occurred", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted", e);
+ } catch (Exception e) {
+ log.error("Unexpected exception", e);
+ }
while (true) {
Map<String, AutoScaling.Trigger> copy = null;
@@ -104,18 +121,31 @@ public class OverseerTriggerThread implements Runnable, Closeable {
// this can throw InterruptedException and we don't want to unlock if it did, so we keep this outside
// of the try/finally block
updateLock.lockInterruptibly();
+
+ // must check for close here before we await on the condition otherwise we can only be woken up on interruption
+ if (isClosed) {
+ log.warn("OverseerTriggerThread has been closed, exiting.");
+ break;
+ }
+
+ log.debug("Current znodeVersion {}, lastZnodeVersion {}", znodeVersion, lastZnodeVersion);
+
try {
if (znodeVersion == lastZnodeVersion) {
updated.await();
// are we closed?
- if (isClosed) break;
+ if (isClosed) {
+ log.warn("OverseerTriggerThread woken up but we are closed, exiting.");
+ break;
+ }
// spurious wakeup?
if (znodeVersion == lastZnodeVersion) continue;
- lastZnodeVersion = znodeVersion;
}
copy = new HashMap<>(activeTriggers);
+ lastZnodeVersion = znodeVersion;
+ log.debug("Processed trigger updates upto znodeVersion {}", znodeVersion);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@@ -145,77 +175,65 @@ public class OverseerTriggerThread implements Runnable, Closeable {
}
}
- private void createWatcher(SolrZkClient zkClient) {
+ class AutoScalingWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ // session events are not change events, and do not remove the watcher
+ if (Event.EventType.None.equals(watchedEvent.getType())) {
+ return;
+ }
+
+ try {
+ refreshAutoScalingConf(this);
+ } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
+ log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
+ } catch (KeeperException e) {
+ log.error("A ZK error has occurred", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted", e);
+ } catch (Exception e) {
+ log.error("Unexpected exception", e);
+ }
+ }
+
+ }
+
+ private void refreshAutoScalingConf(Watcher watcher) throws KeeperException, InterruptedException {
+ updateLock.lock();
try {
- zkClient.exists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- // session events are not change events, and do not remove the watcher
- if (Event.EventType.None.equals(watchedEvent.getType())) {
- return;
- }
+ if (isClosed) {
+ return;
+ }
+ final Stat stat = new Stat();
+ final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, watcher, stat, true);
+ log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, stat.getVersion());
+ if (znodeVersion >= stat.getVersion()) {
+ // protect against reordered watcher fires by ensuring that we only move forward
+ return;
+ }
+ znodeVersion = stat.getVersion();
+ Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
- try {
- refreshAndWatch();
- } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
- log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
- } catch (KeeperException e) {
- log.error("A ZK error has occurred", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("Interrupted", e);
- } catch (Exception e) {
- log.error("Unexpected exception", e);
- }
- }
+ // remove all active triggers that have been removed from ZK
+ Set<String> trackingKeySet = activeTriggers.keySet();
+ trackingKeySet.retainAll(triggerMap.keySet());
- private void refreshAndWatch() throws KeeperException, InterruptedException {
- updateLock.lock();
- try {
- if (isClosed) {
- return;
- }
- final Stat stat = new Stat();
- final byte[] data = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, this, stat, true);
- log.debug("{} watcher fired with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, stat.getVersion());
- if (znodeVersion >= stat.getVersion()) {
- // protect against reordered watcher fires by ensuring that we only move forward
- return;
- }
- znodeVersion = stat.getVersion();
- Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
-
- // remove all active triggers that have been removed from ZK
- Set<String> trackingKeySet = activeTriggers.keySet();
- trackingKeySet.retainAll(triggerMap.keySet());
-
- // now lets add or remove triggers which have been enabled or disabled respectively
- for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
- String triggerName = entry.getKey();
- AutoScaling.Trigger trigger = entry.getValue();
- if (trigger.isEnabled()) {
- activeTriggers.put(triggerName, trigger);
- } else {
- activeTriggers.remove(triggerName);
- }
- }
- updated.signalAll();
- } finally {
- updateLock.unlock();
- }
+ // now lets add or remove triggers which have been enabled or disabled respectively
+ for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
+ String triggerName = entry.getKey();
+ AutoScaling.Trigger trigger = entry.getValue();
+ if (trigger.isEnabled()) {
+ activeTriggers.put(triggerName, trigger);
+ } else {
+ activeTriggers.remove(triggerName);
}
- }, true);
- } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
- log.error("OverseerTriggerThread could not talk to ZooKeeper", e);
- } catch (KeeperException e) {
- log.error("Exception in OverseerTriggerThread", e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.error("OverseerTriggerThread interrupted", e);
+ }
+ updated.signalAll();
+ } finally {
+ updateLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c15a1737/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 545c0d6..0ac8183 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.util.NamedList;
@@ -73,7 +74,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Before
- public void setupTest() throws KeeperException, InterruptedException, IOException, SolrServerException {
+ public void setupTest() throws Exception {
waitForSeconds = 1 + random().nextInt(3);
actionCreated = new CountDownLatch(1);
triggerFiredLatch = new CountDownLatch(1);
@@ -85,6 +86,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// todo nocommit -- add testing for the v2 path
// String path = random().nextBoolean() ? "/admin/autoscaling" : "/v2/cluster/autoscaling";
this.path = "/admin/autoscaling";
+ while (cluster.getJettySolrRunners().size() < 2) {
+ // perhaps a test stopped a node but didn't start it back
+ // lets start a node
+ cluster.startJettySolrRunner();
+ }
}
@Test
@@ -416,6 +422,54 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
lostNodeName, nodeLostEvent.getNodeName());
}
+ @Test
+ public void testContinueTriggersOnOverseerRestart() throws Exception {
+ CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ CollectionAdminResponse adminResponse = status.process(solrClient);
+ NamedList<Object> response = adminResponse.getResponse();
+ String leader = (String) response.get("leader");
+ JettySolrRunner overseerNode = null;
+ int index = -1;
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ for (int i = 0; i < jettySolrRunners.size(); i++) {
+ JettySolrRunner runner = jettySolrRunners.get(i);
+ if (runner.getNodeName().equals(leader)) {
+ overseerNode = runner;
+ index = i;
+ break;
+ }
+ }
+ assertNotNull(overseerNode);
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ if (!actionCreated.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ // stop the overseer, somebody else will take over as the overseer
+ cluster.stopJettySolrRunner(index);
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+ assertNotNull(nodeAddedEvent);
+ assertEquals("The node added trigger was fired but for a different node",
+ newNode.getNodeName(), nodeAddedEvent.getNodeName());
+ }
+
public static class TestTriggerAction implements TriggerAction {
public TestTriggerAction() {