You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/02/18 13:45:45 UTC

[sling-org-apache-sling-distribution-journal-it] branch master updated: SLING-9075 - Support multiple agents with one StagingPrecondition

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f090fc  SLING-9075 - Support multiple agents with one StagingPrecondition
2f090fc is described below

commit 2f090fc5c5f2e535023992d1e29bf933872eaeaa
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Feb 18 14:45:02 2020 +0100

    SLING-9075 - Support multiple agents with one StagingPrecondition
---
 .../journal/it/DistributionTestBase.java           | 42 ++++++++++++----------
 .../journal/it/DistributionTestSupport.java        |  9 +----
 .../it/tests/StagedDistributionFailureTest.java    | 30 ++++++++++------
 3 files changed, 43 insertions(+), 38 deletions(-)

diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
index 20da677..01464e2 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.it;
 
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
 
@@ -59,6 +58,7 @@ import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
 import org.awaitility.Duration;
 import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.ops4j.pax.exam.Configuration;
@@ -168,12 +168,8 @@ public class DistributionTestBase extends DistributionTestSupport {
         return queueNames;
     }
 
-    private boolean allQueuesEmpty() {
-        return queueNames().stream().allMatch(this::queueEmpty);
-    }
-
-    private boolean queueEmpty(String queueName) {
-        return agent.getQueue(queueName).getStatus().getItemsCount() == 0;
+    protected int getQueueItems(String queueName) {
+        return agent.getQueue(queueName).getStatus().getItemsCount();
     }
 
     @SuppressWarnings({ "deprecation" })
@@ -219,28 +215,36 @@ public class DistributionTestBase extends DistributionTestSupport {
             .until(() -> tryGetPath(httpPort, path), equalTo(200));
     }
 
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
     public Iterable<String> waitSubQueues(String... queues) {
-        Matcher[] matchers = Stream.of(queues).map(q -> containsString(q)).toArray(Matcher[]::new);
-
-        await().atMost(Duration.FIVE_MINUTES)
-                .until(this::queueNames, containsInAnyOrder(matchers));
-
-        Iterable<String> queueNames = agent.getQueueNames();
+        Matcher<String>[] matchers = containsNames(queues);
+        Iterable<String> queueNames = await().atMost(Duration.ONE_MINUTE)
+            .pollInterval(Duration.FIVE_SECONDS)
+            .until(this::queueNames, containsInAnyOrder(matchers));
         LOG.info("Subscriber Queues: " + String.join(", ", queueNames));
-
         return queueNames;
     }
 
+    @SuppressWarnings("unchecked")
+    private Matcher<String>[] containsNames(String... queues) {
+        return Stream.of(queues)
+                .map(name -> Matchers.containsString(name))
+                .toArray(Matcher[]::new);
+    }
+
     protected void waitEmptySubQueues() {
-        await().atMost(60, TimeUnit.SECONDS)
-                .until(this::allQueuesEmpty, equalTo(true));
+        List<String> names = queueNames();
+        for (String name : names) {
+            await("Queue " + name + "empty")
+                .atMost(60, TimeUnit.SECONDS)
+                .until(() -> getQueueItems(name), equalTo(0));
+        }
     }
 
 
     static protected void waitQueueItems(int httpPort, String agentName, int count) {
-        await().atMost(Duration.FIVE_MINUTES)
+        await("Waiting for number of items in queue.")
+                .atMost(Duration.ONE_MINUTE)
+                .pollInterval(Duration.FIVE_SECONDS)
                 .until(() -> tryGetQueueItems(httpPort, agentName), equalTo(count));
         LOG.info("Items count {} for agent {}", count, agentName + "-" + httpPort);
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
index 1223afd..17e471e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -306,7 +306,7 @@ public class DistributionTestSupport extends TestSupport {
     protected static Option publishOsgiConfigs(String agentName, boolean editable, String stage) {
 
 
-        Option subConfig = composite(
+        return composite(
                 factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
                         .put("kind", "agent")
                         .put("provider.roots", "/libs/sling/distribution/services/agents")
@@ -320,13 +320,6 @@ public class DistributionTestSupport extends TestSupport {
                         .put("editable", editable)
                         .put("announceDelay", "500")
                         .asOption());
-
-        Option condConfig =  newConfiguration("org.apache.sling.distribution.journal.impl.subscriber.StagingPrecondition")
-                .put("subAgentName", stage)
-                .asOption();
-
-        return stage != null ? composite(subConfig, condConfig) : subConfig;
-
     }
 
 
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
index 3e11ddb..600eb0c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
@@ -36,12 +36,12 @@ import java.io.IOException;
 @ExamReactorStrategy(PerClass.class)
 public class StagedDistributionFailureTest extends DistributionTestBase {
 
-    private static final String SUB1_AGENT = "subscriber-regular";
-    private static final String SUB2_AGENT = "subscriber-golden";
+    private static final String SUB1_AGENT = "subscriber";
+    private static final String SUB2_AGENT = "subscriber";
 
 
-    private static TestContainer publish;
-    private static TestContainer golden_publish;
+    private static volatile TestContainer publish;
+    private static volatile TestContainer golden_publish;
 
 
     private static final String TEST_PATH = "/content/mytest";
@@ -51,14 +51,16 @@ public class StagedDistributionFailureTest extends DistributionTestBase {
     public static void beforeOsgi() throws Exception {
         beforeOsgiBase();
         publish = startPublishInstance(8182,  SUB1_AGENT, false,  SUB2_AGENT);
+        new Thread(StagedDistributionFailureTest::delayedStartGoldenSubscriber).start();
+    }
 
-        new Thread(() -> {
-            // Wait for  at least one item in publish queue before starting golden publish
-            waitQueueItems(8182, SUB1_AGENT, 1);
-
-            LOG.info("Starting golden publish");
-            golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
-        }).start();
+    /**
+     * Wait for  at least one item in publish queue before starting golden publish
+     */
+    private static void delayedStartGoldenSubscriber() {
+        waitQueueItems(8182, SUB1_AGENT, 1);
+        LOG.info("Starting golden publish");
+        golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
     }
 
     @AfterOsgi
@@ -80,6 +82,12 @@ public class StagedDistributionFailureTest extends DistributionTestBase {
         waitSubQueues(SUB1_AGENT);
     }
 
+    /**
+     * Start just a regular subscriber and do a distribution. 
+     * We expect that this subscriber does not process the package as it wait on the golden subscriber.
+     * After the message is present in the queue we start the golden subscriber.
+     * Now first golden then regular subscriber should be able to process the distribution package.
+     */
     @Test
     public void testDistribute() {