You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/18 19:20:50 UTC

[2/2] lucene-solr:jira/solr-10515: SOLR-10515: Use MapWriter for serialization of TriggerEvents. WIP on unit testing.

SOLR-10515: Use MapWriter for serialization of TriggerEvents. WIP on unit testing.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e15ce809
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e15ce809
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e15ce809

Branch: refs/heads/jira/solr-10515
Commit: e15ce80980b854c60e2e287f0e39afcc7f3f14a5
Parents: d583dcd
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu May 18 21:19:40 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu May 18 21:19:40 2017 +0200

----------------------------------------------------------------------
 .../solr/cloud/autoscaling/AutoScaling.java     |   3 +-
 .../cloud/autoscaling/NodeAddedTrigger.java     |   1 -
 .../solr/cloud/autoscaling/NodeLostTrigger.java |   1 -
 .../autoscaling/OverseerTriggerThread.java      |   2 +-
 .../cloud/autoscaling/ScheduledTriggers.java    |   1 +
 .../cloud/autoscaling/TriggerEventBase.java     |  12 +++
 .../cloud/autoscaling/TriggerEventQueue.java    |   8 +-
 .../autoscaling/TriggerIntegrationTest.java     | 107 ++++++++++++++++++-
 8 files changed, 119 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 0b6aca3..e8d9b5e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.common.MapWriter;
 import org.apache.solr.core.CoreContainer;
 
 public class AutoScaling {
@@ -48,7 +49,7 @@ public class AutoScaling {
     AFTER_ACTION
   }
 
-  public interface TriggerEvent {
+  public interface TriggerEvent extends MapWriter {
     EventType getEventType();
 
     String getSource();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 86cc13e..dac40b5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -226,7 +226,6 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
   }
 
   public static class NodeAddedEvent extends TriggerEventBase {
-    public static final String NODE_NAME = "nodeName";
 
     public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedNanoTime, String nodeAdded) {
       super(eventType, source, nodeAddedNanoTime, Collections.singletonMap(NODE_NAME, nodeAdded));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 35c94ac..4eb71fe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -220,7 +220,6 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
   }
 
   public static class NodeLostEvent extends TriggerEventBase {
-    public static final String NODE_NAME = "nodeName";
 
     public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostNanoTime, String nodeRemoved) {
       super(eventType, source, nodeLostNanoTime, Collections.singletonMap(NODE_NAME, nodeRemoved));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 0db38fb..dcc76b4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -98,7 +98,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
     SolrZkClient zkClient = zkStateReader.getZkClient();
     createWatcher(zkClient);
 
-    while (true) {
+    while (!isClosed) {
       Map<String, AutoScaling.Trigger> copy = null;
       try {
         // this can throw InterruptedException and we don't want to unlock if it did, so we keep this outside

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 533fbc9..74b61a1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -225,6 +225,7 @@ public class ScheduledTriggers implements Closeable {
 
     @Override
     public void run() {
+      log.info("--running " + trigger.getName());
       // replay accumulated events first, if any
       AutoScaling.TriggerEvent event;
       while ((event = queue.pollEvent()) != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
index 2290e40..168dfaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
@@ -16,14 +16,18 @@
  */
 package org.apache.solr.cloud.autoscaling;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.solr.common.MapWriter;
+
 /**
  * Base class for event implementations.
  */
 public abstract class TriggerEventBase implements AutoScaling.TriggerEvent {
   public static final String REPLAYING = "replaying";
+  public static final String NODE_NAME = "nodeName";
 
   protected final String source;
   protected final long eventNanoTime;
@@ -74,6 +78,14 @@ public abstract class TriggerEventBase implements AutoScaling.TriggerEvent {
   }
 
   @Override
+  public void writeMap(EntryWriter ew) throws IOException {
+    ew.put("source", source);
+    ew.put("eventNanoTime", eventNanoTime);
+    ew.put("eventType", eventType.toString());
+    ew.put("properties", properties);
+  }
+
+  @Override
   public String toString() {
     return this.getClass().getSimpleName() + "{" +
         "source='" + source + '\'' +

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
index 3bf79a6..83b30fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -36,13 +36,7 @@ public class TriggerEventQueue extends DistributedQueue {
 
   public boolean offerEvent(AutoScaling.TriggerEvent event) {
     try {
-      // yuck, serializing simple beans should be supported by Utils...
-      Map<String, Object> map = new HashMap<>();
-      map.put("eventType", event.getEventType().toString());
-      map.put("source", event.getSource());
-      map.put("eventNanoTime", event.getEventNanoTime());
-      map.put("properties", event.getProperties());
-      byte[] data = Utils.toJSON(map);
+      byte[] data = Utils.toJSON(event);
       offer(data);
       return true;
     } catch (KeeperException | InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e15ce809/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 34e8587..75cf320 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -60,6 +62,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
   private static CountDownLatch actionCreated;
   private static CountDownLatch triggerFiredLatch;
   private static int waitForSeconds = 1;
+  private static CyclicBarrier actionStarted;
   private static AtomicBoolean triggerFired;
   private static AtomicReference<AutoScaling.TriggerEvent> eventRef;
 
@@ -78,6 +81,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     actionCreated = new CountDownLatch(1);
     triggerFiredLatch = new CountDownLatch(1);
     triggerFired = new AtomicBoolean(false);
+    actionStarted = new CyclicBarrier(2);
     eventRef = new AtomicReference<>();
     // clear any persisted auto scaling configuration
     Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
@@ -103,7 +107,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
         "'event' : 'nodeAdded'," +
         "'waitFor' : '0s'," +
         "'enabled' : true," +
-        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
         "}}";
     SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
     NamedList<Object> response = solrClient.request(req);
@@ -116,7 +120,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
         "'event' : 'nodeAdded'," +
         "'waitFor' : '0s'," +
         "'enabled' : true," +
-        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
         "}}";
     req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
     response = solrClient.request(req);
@@ -144,7 +148,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
         "'event' : 'nodeLost'," +
         "'waitFor' : '0s'," +
         "'enabled' : true," +
-        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
         "}}";
     req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
     response = solrClient.request(req);
@@ -156,7 +160,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
         "'event' : 'nodeLost'," +
         "'waitFor' : '0s'," +
         "'enabled' : true," +
-        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "'actions' : [{'name':'test','class':'" + ThrottlingTesterAction.class.getName() + "'}]" +
         "}}";
     req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
     response = solrClient.request(req);
@@ -184,7 +188,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
   static AtomicLong lastActionExecutedAt = new AtomicLong(0);
   static ReentrantLock lock = new ReentrantLock();
-  public static class ThrottingTesterAction extends TestTriggerAction {
+  public static class ThrottlingTesterAction extends TestTriggerAction {
     // nanos are very precise so we need a delta for comparison with ms
     private static final long DELTA_MS = 2;
 
@@ -456,4 +460,97 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     }
   }
+
+  public static class TestEventQueueAction implements TriggerAction {
+
+    public TestEventQueueAction() {
+      log.info("TestEventQueueAction instantiated");
+      actionCreated.countDown();
+    }
+
+    @Override
+    public String getName() {
+      return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public String getClassName() {
+      return this.getClass().getName();
+    }
+
+    @Override
+    public void process(AutoScaling.TriggerEvent event) {
+      eventRef.set(event);
+      try {
+        actionStarted.await();
+      } catch (InterruptedException | BrokenBarrierException e) {
+        throw new RuntimeException("broken barrier", e);
+      }
+      try {
+        Thread.sleep(5000);
+        triggerFired.compareAndSet(false, true);
+      } catch (InterruptedException e) {
+        return;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void init(Map<String, String> args) {
+
+    }
+  }
+
+  @Test
+  public void testEventQueue() throws Exception {
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger1'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '" + waitForSeconds + "s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
+        "}}";
+    NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+    String overseerLeader = (String) overSeerStatus.get("leader");
+    int overseerLeaderIndex = 0;
+    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+      if (jetty.getNodeName().equals(overseerLeader)) {
+        overseerLeaderIndex = i;
+        break;
+      }
+    }
+    SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    if (!actionCreated.await(3, TimeUnit.SECONDS))  {
+      fail("The TriggerAction should have been created by now");
+    }
+
+    // add node to generate the event
+    JettySolrRunner newNode = cluster.startJettySolrRunner();
+    int await = actionStarted.await(60, TimeUnit.SECONDS);
+    assertEquals("The trigger did not fire at all", 1, await);
+    // event should be there
+    NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
+    assertNotNull(nodeAddedEvent);
+    // but action did not complete yet so the event is still enqueued
+    assertFalse(triggerFired.get());
+    actionStarted.reset();
+    // kill overseer leader
+    cluster.stopJettySolrRunner(overseerLeaderIndex);
+    Thread.sleep(5000);
+    // new overseer leader should be elected and run triggers
+    newNode = cluster.startJettySolrRunner();
+    // it should fire again but not complete yet
+    await = actionStarted.await(600, TimeUnit.SECONDS);
+    AutoScaling.TriggerEvent replayedEvent = eventRef.get();
+  }
 }