You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/02/18 13:45:45 UTC
[sling-org-apache-sling-distribution-journal-it] branch master
updated: SLING-9075 - Support multiple agents with one StagingPrecondition
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git
The following commit(s) were added to refs/heads/master by this push:
new 2f090fc SLING-9075 - Support multiple agents with one StagingPrecondition
2f090fc is described below
commit 2f090fc5c5f2e535023992d1e29bf933872eaeaa
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Feb 18 14:45:02 2020 +0100
SLING-9075 - Support multiple agents with one StagingPrecondition
---
.../journal/it/DistributionTestBase.java | 42 ++++++++++++----------
.../journal/it/DistributionTestSupport.java | 9 +----
.../it/tests/StagedDistributionFailureTest.java | 30 ++++++++++------
3 files changed, 43 insertions(+), 38 deletions(-)
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
index 20da677..01464e2 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.it;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
@@ -59,6 +58,7 @@ import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
import org.awaitility.Duration;
import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.ops4j.pax.exam.Configuration;
@@ -168,12 +168,8 @@ public class DistributionTestBase extends DistributionTestSupport {
return queueNames;
}
- private boolean allQueuesEmpty() {
- return queueNames().stream().allMatch(this::queueEmpty);
- }
-
- private boolean queueEmpty(String queueName) {
- return agent.getQueue(queueName).getStatus().getItemsCount() == 0;
+ protected int getQueueItems(String queueName) {
+ return agent.getQueue(queueName).getStatus().getItemsCount();
}
@SuppressWarnings({ "deprecation" })
@@ -219,28 +215,36 @@ public class DistributionTestBase extends DistributionTestSupport {
.until(() -> tryGetPath(httpPort, path), equalTo(200));
}
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
public Iterable<String> waitSubQueues(String... queues) {
- Matcher[] matchers = Stream.of(queues).map(q -> containsString(q)).toArray(Matcher[]::new);
-
- await().atMost(Duration.FIVE_MINUTES)
- .until(this::queueNames, containsInAnyOrder(matchers));
-
- Iterable<String> queueNames = agent.getQueueNames();
+ Matcher<String>[] matchers = containsNames(queues);
+ Iterable<String> queueNames = await().atMost(Duration.ONE_MINUTE)
+ .pollInterval(Duration.FIVE_SECONDS)
+ .until(this::queueNames, containsInAnyOrder(matchers));
LOG.info("Subscriber Queues: " + String.join(", ", queueNames));
-
return queueNames;
}
+ @SuppressWarnings("unchecked")
+ private Matcher<String>[] containsNames(String... queues) {
+ return Stream.of(queues)
+ .map(name -> Matchers.containsString(name))
+ .toArray(Matcher[]::new);
+ }
+
protected void waitEmptySubQueues() {
- await().atMost(60, TimeUnit.SECONDS)
- .until(this::allQueuesEmpty, equalTo(true));
+ List<String> names = queueNames();
+ for (String name : names) {
+ await("Queue " + name + "empty")
+ .atMost(60, TimeUnit.SECONDS)
+ .until(() -> getQueueItems(name), equalTo(0));
+ }
}
static protected void waitQueueItems(int httpPort, String agentName, int count) {
- await().atMost(Duration.FIVE_MINUTES)
+ await("Waiting for number of items in queue.")
+ .atMost(Duration.ONE_MINUTE)
+ .pollInterval(Duration.FIVE_SECONDS)
.until(() -> tryGetQueueItems(httpPort, agentName), equalTo(count));
LOG.info("Items count {} for agent {}", count, agentName + "-" + httpPort);
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
index 1223afd..17e471e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -306,7 +306,7 @@ public class DistributionTestSupport extends TestSupport {
protected static Option publishOsgiConfigs(String agentName, boolean editable, String stage) {
- Option subConfig = composite(
+ return composite(
factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
.put("kind", "agent")
.put("provider.roots", "/libs/sling/distribution/services/agents")
@@ -320,13 +320,6 @@ public class DistributionTestSupport extends TestSupport {
.put("editable", editable)
.put("announceDelay", "500")
.asOption());
-
- Option condConfig = newConfiguration("org.apache.sling.distribution.journal.impl.subscriber.StagingPrecondition")
- .put("subAgentName", stage)
- .asOption();
-
- return stage != null ? composite(subConfig, condConfig) : subConfig;
-
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
index 3e11ddb..600eb0c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
@@ -36,12 +36,12 @@ import java.io.IOException;
@ExamReactorStrategy(PerClass.class)
public class StagedDistributionFailureTest extends DistributionTestBase {
- private static final String SUB1_AGENT = "subscriber-regular";
- private static final String SUB2_AGENT = "subscriber-golden";
+ private static final String SUB1_AGENT = "subscriber";
+ private static final String SUB2_AGENT = "subscriber";
- private static TestContainer publish;
- private static TestContainer golden_publish;
+ private static volatile TestContainer publish;
+ private static volatile TestContainer golden_publish;
private static final String TEST_PATH = "/content/mytest";
@@ -51,14 +51,16 @@ public class StagedDistributionFailureTest extends DistributionTestBase {
public static void beforeOsgi() throws Exception {
beforeOsgiBase();
publish = startPublishInstance(8182, SUB1_AGENT, false, SUB2_AGENT);
+ new Thread(StagedDistributionFailureTest::delayedStartGoldenSubscriber).start();
+ }
- new Thread(() -> {
- // Wait for at least one item in publish queue before starting golden publish
- waitQueueItems(8182, SUB1_AGENT, 1);
-
- LOG.info("Starting golden publish");
- golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
- }).start();
+ /**
+ * Wait for at least one item in publish queue before starting golden publish
+ */
+ private static void delayedStartGoldenSubscriber() {
+ waitQueueItems(8182, SUB1_AGENT, 1);
+ LOG.info("Starting golden publish");
+ golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
}
@AfterOsgi
@@ -80,6 +82,12 @@ public class StagedDistributionFailureTest extends DistributionTestBase {
waitSubQueues(SUB1_AGENT);
}
+ /**
+ * Start just a regular subscriber and do a distribution.
+ * We expect that this subscriber does not process the package as it wait on the golden subscriber.
+ * After the message is present in the queue we start the golden subscriber.
+ * Now first golden then regular subscriber should be able to process the distribution package.
+ */
@Test
public void testDistribute() {