You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/18 19:20:50 UTC
[2/2] lucene-solr:jira/solr-10515: SOLR-10515: Use MapWriter for
serialization of TriggerEvents. WIP on unit testing.
SOLR-10515: Use MapWriter for serialization of TriggerEvents. WIP on unit testing.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e15ce809
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e15ce809
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e15ce809
Branch: refs/heads/jira/solr-10515
Commit: e15ce80980b854c60e2e287f0e39afcc7f3f14a5
Parents: d583dcd
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu May 18 21:19:40 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu May 18 21:19:40 2017 +0200
----------------------------------------------------------------------
.../solr/cloud/autoscaling/AutoScaling.java | 3 +-
.../cloud/autoscaling/NodeAddedTrigger.java | 1 -
.../solr/cloud/autoscaling/NodeLostTrigger.java | 1 -
.../autoscaling/OverseerTriggerThread.java | 2 +-
.../cloud/autoscaling/ScheduledTriggers.java | 1 +
.../cloud/autoscaling/TriggerEventBase.java | 12 +++
.../cloud/autoscaling/TriggerEventQueue.java | 8 +-
.../autoscaling/TriggerIntegrationTest.java | 107 ++++++++++++++++++-
8 files changed, 119 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 0b6aca3..e8d9b5e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.core.CoreContainer;
public class AutoScaling {
@@ -48,7 +49,7 @@ public class AutoScaling {
AFTER_ACTION
}
- public interface TriggerEvent {
+ public interface TriggerEvent extends MapWriter {
EventType getEventType();
String getSource();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 86cc13e..dac40b5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -226,7 +226,6 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
}
public static class NodeAddedEvent extends TriggerEventBase {
- public static final String NODE_NAME = "nodeName";
public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedNanoTime, String nodeAdded) {
super(eventType, source, nodeAddedNanoTime, Collections.singletonMap(NODE_NAME, nodeAdded));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 35c94ac..4eb71fe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -220,7 +220,6 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
}
public static class NodeLostEvent extends TriggerEventBase {
- public static final String NODE_NAME = "nodeName";
public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostNanoTime, String nodeRemoved) {
super(eventType, source, nodeLostNanoTime, Collections.singletonMap(NODE_NAME, nodeRemoved));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 0db38fb..dcc76b4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -98,7 +98,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
SolrZkClient zkClient = zkStateReader.getZkClient();
createWatcher(zkClient);
- while (true) {
+ while (!isClosed) {
Map<String, AutoScaling.Trigger> copy = null;
try {
// this can throw InterruptedException and we don't want to unlock if it did, so we keep this outside
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 533fbc9..74b61a1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -225,6 +225,7 @@ public class ScheduledTriggers implements Closeable {
@Override
public void run() {
+ log.info("--running " + trigger.getName());
// replay accumulated events first, if any
AutoScaling.TriggerEvent event;
while ((event = queue.pollEvent()) != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
index 2290e40..168dfaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
@@ -16,14 +16,18 @@
*/
package org.apache.solr.cloud.autoscaling;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.solr.common.MapWriter;
+
/**
* Base class for event implementations.
*/
public abstract class TriggerEventBase implements AutoScaling.TriggerEvent {
public static final String REPLAYING = "replaying";
+ public static final String NODE_NAME = "nodeName";
protected final String source;
protected final long eventNanoTime;
@@ -74,6 +78,14 @@ public abstract class TriggerEventBase implements AutoScaling.TriggerEvent {
}
@Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ ew.put("source", source);
+ ew.put("eventNanoTime", eventNanoTime);
+ ew.put("eventType", eventType.toString());
+ ew.put("properties", properties);
+ }
+
+ @Override
public String toString() {
return this.getClass().getSimpleName() + "{" +
"source='" + source + '\'' +
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
index 3bf79a6..83b30fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -36,13 +36,7 @@ public class TriggerEventQueue extends DistributedQueue {
public boolean offerEvent(AutoScaling.TriggerEvent event) {
try {
- // yuck, serializing simple beans should be supported by Utils...
- Map<String, Object> map = new HashMap<>();
- map.put("eventType", event.getEventType().toString());
- map.put("source", event.getSource());
- map.put("eventNanoTime", event.getEventNanoTime());
- map.put("properties", event.getProperties());
- byte[] data = Utils.toJSON(map);
+ byte[] data = Utils.toJSON(event);
offer(data);
return true;
} catch (KeeperException | InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 34e8587..75cf320 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -60,6 +62,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
private static CountDownLatch actionCreated;
private static CountDownLatch triggerFiredLatch;
private static int waitForSeconds = 1;
+ private static CyclicBarrier actionStarted;
private static AtomicBoolean triggerFired;
private static AtomicReference<AutoScaling.TriggerEvent> eventRef;
@@ -78,6 +81,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
actionCreated = new CountDownLatch(1);
triggerFiredLatch = new CountDownLatch(1);
triggerFired = new AtomicBoolean(false);
+ actionStarted = new CyclicBarrier(2);
eventRef = new AtomicReference<>();
// clear any persisted auto scaling configuration
Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
@@ -103,7 +107,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeAdded'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
@@ -116,7 +120,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeAdded'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
@@ -144,7 +148,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeLost'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
@@ -156,7 +160,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
"'event' : 'nodeLost'," +
"'waitFor' : '0s'," +
"'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
"}}";
req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
response = solrClient.request(req);
@@ -184,7 +188,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
static AtomicLong lastActionExecutedAt = new AtomicLong(0);
static ReentrantLock lock = new ReentrantLock();
- public static class ThrottingTesterAction extends TestTriggerAction {
+ public static class ThrottlingTesterAction extends TestTriggerAction {
// nanos are very precise so we need a delta for comparison with ms
private static final long DELTA_MS = 2;
@@ -456,4 +460,97 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
+
+ public static class TestEventQueueAction implements TriggerAction {
+
+ public TestEventQueueAction() {
+ log.info("TestEventQueueAction instantiated");
+ actionCreated.countDown();
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getClassName() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public void process(AutoScaling.TriggerEvent event) {
+ eventRef.set(event);
+ try {
+ actionStarted.await();
+ } catch (InterruptedException | BrokenBarrierException e) {
+ throw new RuntimeException("broken barrier", e);
+ }
+ try {
+ Thread.sleep(5000);
+ triggerFired.compareAndSet(false, true);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+
+ }
+ }
+
+ @Test
+ public void testEventQueue() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger1'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
+ "}}";
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionCreated.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ // add node to generate the event
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ int await = actionStarted.await(60, TimeUnit.SECONDS);
+ assertEquals("The trigger did not fire at all", 1, await);
+ // event should be there
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+ assertNotNull(nodeAddedEvent);
+ // but action did not complete yet so the event is still enqueued
+ assertFalse(triggerFired.get());
+ actionStarted.reset();
+ // kill overseer leader
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ Thread.sleep(5000);
+ // new overseer leader should be elected and run triggers
+ newNode = cluster.startJettySolrRunner();
+ // it should fire again but not complete yet
+ await = actionStarted.await(600, TimeUnit.SECONDS);
+ AutoScaling.TriggerEvent replayedEvent = eventRef.get();
+ }
}