You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/04/16 21:57:48 UTC
[sling-org-apache-sling-distribution-journal-it] 01/01: SLING-8346
- Import Journal based Sling Content Distribution source code
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git
commit c1b04f2835da43d33af0d1f7793c6f175ebd4d62
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 16 23:57:13 2019 +0200
SLING-8346 - Import Journal based Sling Content Distribution source code
This project contains the integration tests of the `org.apache.sling.distribution.journal` modules
Initial contributors
* Christian Schneider @cschneider
* Timothee Maret @tmaret
* Marius Petria @mpetria
---
LICENSE | 202 ++++++++++
README.md | 13 +
pom.xml | 253 +++++++++++++
.../distribution/journal/it/ClusterIdCleaner.java | 79 ++++
.../journal/it/DistributionTestBase.java | 277 ++++++++++++++
.../journal/it/DistributionTestSupport.java | 421 +++++++++++++++++++++
.../sling/distribution/journal/it/FileUtil.java | 54 +++
.../distribution/journal/it/ext/AfterOsgi.java | 30 ++
.../distribution/journal/it/ext/BeforeOsgi.java | 30 ++
.../distribution/journal/it/ext/ExtPaxExam.java | 93 +++++
.../distribution/journal/it/kafka/KafkaLocal.java | 91 +++++
.../distribution/journal/it/kafka/KafkaRule.java | 46 +++
.../journal/it/kafka/PaxExamWithKafka.java | 46 +++
.../journal/it/kafka/ZooKeeperLocal.java | 81 ++++
.../journal/it/tests/Author2PublisherTest.java | 195 ++++++++++
.../journal/it/tests/AuthorDistributeTest.java | 191 ++++++++++
.../journal/it/tests/AuthorRestartTest.java | 178 +++++++++
.../journal/it/tests/ClearQueueItemTest.java | 277 ++++++++++++++
.../journal/it/tests/JournalAvailableTest.java | 116 ++++++
.../it/tests/LatePipeAuthorDistributeTest.java | 152 ++++++++
.../journal/it/tests/PublisherReceiveTest.java | 175 +++++++++
.../distribution/journal/it/tests/ScaleUpTest.java | 274 ++++++++++++++
.../it/tests/StagedDistributionFailureTest.java | 94 +++++
.../journal/it/tests/StagedDistributionTest.java | 86 +++++
src/test/resources/exam.properties | 21 +
src/test/resources/logback.xml | 39 ++
26 files changed, 3514 insertions(+)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..898c182
--- /dev/null
+++ b/README.md
@@ -0,0 +1,13 @@
+org.apache.sling.distribution.journal.it
+==========================================
+
+Toxiproxy
+---------
+
+For some of the tests a local [toxiproxy installation](https://github.com/Shopify/toxiproxy#1-installing-toxiproxy) is needed.
+
+Unfortunately the docker version of toxiproxy does not work correctly on mac. It would need --net=host which does not work on mac as docker is run in a vm. So currently you need to install toxiproxy using brew manually and start it before the tests.
+
+If toxiproxy is not present the respective tests will be skipped with a message that toxiproxy server is missing.
+Because of a bug in pax exam the tests simply succeeed. So be aware that they might not check anything.
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..70dd3fa
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,253 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <!-- ======================================================================= -->
+ <!-- P A R E N T P R O J E C T -->
+ <!-- ======================================================================= -->
+ <parent>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>sling</artifactId>
+ <version>34</version>
+ <relativePath />
+ </parent>
+ <artifactId>org.apache.sling.distribution.journal.it</artifactId>
+ <name>Apache Sling Distribution Journal - IT project</name>
+ <description>
+ Integration Tests project for the Apache Sling Distribution over journal
+ </description>
+
+ <properties>
+ <org.ops4j.pax.exam.version>4.11.0</org.ops4j.pax.exam.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.servicemix.tooling</groupId>
+ <artifactId>depends-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-depends-file</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <!-- see IDEA-133397 -->
+ <version>2.21.0</version>
+ <configuration>
+ <redirectTestOutputToFile>false</redirectTestOutputToFile>
+ <!-- <argLine>-Xmx2048m</argLine> -->
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.api</artifactId>
+ <version>2.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>eu.rekawek.toxiproxy</groupId>
+ <artifactId>toxiproxy-java</artifactId>
+ <version>2.1.3</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.servicemix.bundles</groupId>
+ <artifactId>org.apache.servicemix.bundles.hamcrest</artifactId>
+ <version>1.3_1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jsoup</groupId>
+ <artifactId>jsoup</artifactId>
+ <version>1.11.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.mime</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.serviceusermapper</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.jcr.resource</artifactId>
+ <version>3.0.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.metrics</artifactId>
+ <version>1.2.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
+ <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.distribution.journal</artifactId>
+ <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.distribution.journal.kafka</artifactId>
+ <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-segment-tar</artifactId>
+ <version>1.8.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <scope>test</scope>
+ <version>3.2.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-store-spi</artifactId>
+ <version>1.8.2</version>
+ </dependency>
+
+ <!-- Apache Felix -->
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.framework</artifactId>
+ <version>5.6.10</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Sling Testing PaxExam -->
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.paxexam</artifactId>
+ <version>1.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Pax Exam -->
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-atinject_1.0_spec</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.tinybundles</groupId>
+ <artifactId>tinybundles</artifactId>
+ <version>3.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam</artifactId>
+ <version>${org.ops4j.pax.exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-cm</artifactId>
+ <version>${org.ops4j.pax.exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-container-forked</artifactId>
+ <version>${org.ops4j.pax.exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-junit4</artifactId>
+ <version>${org.ops4j.pax.exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-link-mvn</artifactId>
+ <version>${org.ops4j.pax.exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>3.1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.converter</artifactId>
+ <version>1.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.osgi-mock</artifactId>
+ <version>2.3.4</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ClusterIdCleaner.java b/src/test/java/org/apache/sling/distribution/journal/it/ClusterIdCleaner.java
new file mode 100644
index 0000000..377a68f
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ClusterIdCleaner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+public class ClusterIdCleaner {
+ private NodeStore store;
+
+ public ClusterIdCleaner(File nodeStoreBase) throws InvalidFileStoreVersionException, IOException {
+ FileStore fileStore = FileStoreBuilder.fileStoreBuilder(nodeStoreBase)
+ .withStrictVersionCheck(true)
+ .build();
+ this.store = SegmentNodeStoreBuilders.builder(fileStore).build();
+ }
+
+ public ClusterIdCleaner(NodeStore store) {
+ this.store = store;
+ }
+
+ public void deleteClusterId() {
+ NodeBuilder builder = store.getRoot().builder();
+ NodeBuilder clusterConfigNode = builder.getChildNode(
+ ClusterRepositoryInfo.CLUSTER_CONFIG_NODE);
+
+ if (!clusterConfigNode.exists()) {
+ // if it doesn't exist, then there is no way to delete
+ System.out.println("clusterId was never set or already deleted.");
+ return;
+ }
+
+ if (!clusterConfigNode.hasProperty(ClusterRepositoryInfo.CLUSTER_ID_PROP)) {
+ // the config node exists, but the clusterId not
+ // so again, no way to delete
+ System.out.println("clusterId was never set or already deleted.");
+ return;
+ }
+ String oldClusterId = clusterConfigNode.getProperty(ClusterRepositoryInfo.CLUSTER_ID_PROP)
+ .getValue(Type.STRING);
+ clusterConfigNode.removeProperty(ClusterRepositoryInfo.CLUSTER_ID_PROP);
+ try {
+ store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ System.out.println("clusterId deleted successfully. (old id was " + oldClusterId + ")");
+ } catch (CommitFailedException e) {
+ System.err.println("Failed to delete clusterId due to exception: "+e.getMessage());
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
new file mode 100644
index 0000000..20da677
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+import org.awaitility.Duration;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Before;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.util.Filter;
+import org.ops4j.pax.exam.util.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributionTestBase extends DistributionTestSupport {
+ protected static Logger LOG = LoggerFactory.getLogger(DistributionTestBase.class);
+
+ private static KafkaLocal kafka;
+
+ private static final String RESOURCE_TYPE = "sling:Folder";
+ private static final String PUB1_AGENT = "agent1";
+
+ @Inject
+ @Filter(value = "(name=agent1)", timeout = 40000L)
+ DistributionAgent agent;
+
+ @Inject
+ @Filter
+ Distributor distributor;
+
+ @Inject
+ ResourceResolverFactory resourceResolverFactory;
+
+ @Inject
+ MessagingProvider clientProvider;
+
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[] { //
+ //debug(),
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ baseConfiguration(), //
+ defaultOsgiConfigs(), //
+ authorOsgiConfigs() //
+ };
+ }
+
+ public static void beforeOsgiBase() throws Exception {
+ kafka = new KafkaLocal();
+ DistributionTestSupport.createTopics();
+ }
+
+ public static void afterOsgiBase() {
+ IOUtils.closeQuietly(kafka);
+ }
+
+ @Before
+ public void beforeBase() {
+
+ }
+
+ @After
+ public void afterBase() {
+
+ }
+
+
+ public static TestContainer startPublishInstance(int httpPort, String agentName, boolean editable, String stageAgentName) {
+ ExamSystem testSystem;
+ try {
+ String workdir = String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), "publish-" + httpPort + "-" + UUID.randomUUID().toString());
+ Option[] config = CoreOptions.options( //
+ new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
+ defaultOsgiConfigs(), //
+ publishOsgiConfigs(agentName, editable, stageAgentName), //
+ CoreOptions.workingDirectory(workdir)
+ );
+
+ testSystem = PaxExamRuntime.createTestSystem(config);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ TestContainer container = PaxExamRuntime.createContainer(testSystem);
+ container.start();
+ return container;
+ }
+
+
+ public void distribute(String path) {
+ try (ResourceResolver resolver = createResolver()) {
+ await().until(() -> tryDistribute(resolver, path), equalTo(true));
+ }
+ }
+
+ private boolean tryDistribute(ResourceResolver resolver, String path) {
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, path);
+ DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+ LOG.info("Distribution for path {} ended with message: {} and status: {}", new Object[] { path,
+ response.getMessage(), response.isSuccessful() });
+ return response.isSuccessful();
+ }
+
+ private List<String> queueNames() {
+ List<String> queueNames = new ArrayList<>();
+ agent.getQueueNames().forEach(queueNames::add);
+ return queueNames;
+ }
+
+ private boolean allQueuesEmpty() {
+ return queueNames().stream().allMatch(this::queueEmpty);
+ }
+
+ private boolean queueEmpty(String queueName) {
+ return agent.getQueue(queueName).getStatus().getItemsCount() == 0;
+ }
+
+ @SuppressWarnings({ "deprecation" })
+ private ResourceResolver createResolver() {
+ try {
+ Map<String, Object> authinfo = new HashMap<String, Object>();
+ return resourceResolverFactory.getAdministrativeResourceResolver(authinfo);
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void createPath(String path) {
+ try (ResourceResolver resolver = createResolver()){
+ ResourceUtil.getOrCreateResource(resolver, path, RESOURCE_TYPE, RESOURCE_TYPE, true);
+ } catch (Exception e) {
+ LOG.error("cannot create path", e);
+ }
+ }
+
+ private static int tryGetPath(int httpPort, String path) {
+ String url = String.format("http://localhost:%s%s.json", httpPort, path);
+ HttpGet httpGet = new HttpGet(url);
+ Header authHeader = null;
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ authHeader = new BasicScheme().authenticate(new UsernamePasswordCredentials("admin", "admin"), httpGet, null);
+ httpGet.addHeader(authHeader);
+
+
+ CloseableHttpResponse response = client.execute(httpGet);
+ int status = response.getStatusLine().getStatusCode();
+ LOG.info("try get path {} with status {}", url, status);
+ return status;
+
+ } catch (Exception e) {
+ LOG.error("cannot get path {}", url, e);
+ }
+ return -1;
+ }
+
+ protected static void waitPath(int httpPort, String path) {
+ await().atMost(30, TimeUnit.SECONDS)
+ .until(() -> tryGetPath(httpPort, path), equalTo(200));
+ }
+
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Iterable<String> waitSubQueues(String... queues) {
+ Matcher[] matchers = Stream.of(queues).map(q -> containsString(q)).toArray(Matcher[]::new);
+
+ await().atMost(Duration.FIVE_MINUTES)
+ .until(this::queueNames, containsInAnyOrder(matchers));
+
+ Iterable<String> queueNames = agent.getQueueNames();
+ LOG.info("Subscriber Queues: " + String.join(", ", queueNames));
+
+ return queueNames;
+ }
+
+ protected void waitEmptySubQueues() {
+ await().atMost(60, TimeUnit.SECONDS)
+ .until(this::allQueuesEmpty, equalTo(true));
+ }
+
+
+ static protected void waitQueueItems(int httpPort, String agentName, int count) {
+ await().atMost(Duration.FIVE_MINUTES)
+ .until(() -> tryGetQueueItems(httpPort, agentName), equalTo(count));
+ LOG.info("Items count {} for agent {}", count, agentName + "-" + httpPort);
+
+ }
+
+ static private int tryGetQueueItems(int httpPort, String agentName) {
+ String url = String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json", httpPort, agentName);
+ HttpGet httpGet = new HttpGet(url);
+ Header authHeader = null;
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ authHeader = new BasicScheme().authenticate(new UsernamePasswordCredentials("admin", "admin"), httpGet, null);
+ httpGet.addHeader(authHeader);
+
+
+ CloseableHttpResponse response = client.execute(httpGet);
+ String text = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset());
+ if (text == null) {
+ return -1;
+ }
+
+ String itemsCount = StringUtils.substringBetween(text, "itemsCount\":", ",");
+ if (itemsCount == null) {
+ return -1;
+ }
+
+ return Integer.parseInt(itemsCount.trim());
+ } catch (Throwable e) {
+ LOG.error("cannot get items count {}", url, e);
+ }
+ return -1;
+ }
+
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
new file mode 100644
index 0000000..85ae96c
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+
+import static java.lang.Boolean.getBoolean;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.admin.AdminClient.create;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingDistribution;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingQuickstartOak;
+import static org.ops4j.pax.exam.Constants.START_LEVEL_SYSTEM_BUNDLES;
+import static org.ops4j.pax.exam.CoreOptions.bootDelegationPackage;
+import static org.ops4j.pax.exam.CoreOptions.bundle;
+import static org.ops4j.pax.exam.CoreOptions.composite;
+import static org.ops4j.pax.exam.CoreOptions.frameworkStartLevel;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+import static org.ops4j.pax.exam.CoreOptions.url;
+import static org.ops4j.pax.exam.CoreOptions.vmOption;
+import static org.ops4j.pax.exam.CoreOptions.when;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.io.File;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.kafka.KafkaClientProvider;
+import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
+import org.apache.sling.distribution.serialization.impl.vlt.VaultDistributionPackageBuilderFactory;
+import org.apache.sling.testing.paxexam.SlingOptions;
+import org.apache.sling.testing.paxexam.TestSupport;
+import org.ops4j.pax.exam.ConfigurationManager;
+import org.ops4j.pax.exam.Constants;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.options.CompositeOption;
+import org.ops4j.pax.exam.options.DefaultCompositeOption;
+import org.ops4j.pax.exam.options.libraries.JUnitBundlesOption;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+import com.google.common.collect.ImmutableMap;
+
+public class DistributionTestSupport extends TestSupport {
+ public static final String TOPIC_PACKAGE = "aemdistribution_package";
+ public static final String TOPIC_DISCOVERY = "aemdistribution_discovery";
+ public static final String TOPIC_COMMAND = "aemdistribution_command";
+ public static final String TOPIC_STATUS = "aemdistribution_status";
+ public static final String TOPIC_EVENT = "aemdistribution_event";
+
+ @Inject
+ protected BundleContext bundleContext;
+
+ @Inject
+ protected ConfigurationAdmin configAdmin;
+
+ private int httpPort = 8181;
+
+ public DistributionTestSupport withHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ return this;
+ }
+
+ public Option baseConfiguration() {
+ String workingDirectory = workingDirectory();
+ FileUtil.deleteDir(new File(workingDirectory));
+ return baseConfiguration(workingDirectory);
+ }
+
+ public Option baseConfiguration(String baseDirectory) {
+ // Patch versions of features provided by SlingOptions
+ SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.commons.mime");
+ SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.commons.metrics");
+ SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.core");
+ SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal");
+ SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal.messages");
+ SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal.kafka");
+ SlingOptions.versionResolver.setVersionFromProject("io.dropwizard.metrics", "metrics-core");
+ SlingOptions.versionResolver.setVersion("org.slf4j", "log4j-over-slf4j", "1.7.6");
+
+ Option baseOptions = composite(
+ super.baseConfiguration(),
+ SlingOptions.logback(),
+ mavenBundle().groupId("org.slf4j").artifactId("log4j-over-slf4j").version(SlingOptions.versionResolver),
+
+ // The base sling Quickstart
+ slingQuickstart(baseDirectory),
+ mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.webconsole.plugins.ds").version(SlingOptions.versionResolver),
+ mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.metrics").version(SlingOptions.versionResolver),
+
+ mvn("com.google.protobuf", "protobuf-java"),
+ kafka(),
+
+ // The bundle built (org.apache.sling.distribution.journal)
+ mvn("org.apache.sling", "org.apache.sling.distribution.journal"),
+ mvn("org.apache.sling", "org.apache.sling.distribution.journal.messages"),
+ mvn("org.apache.sling", "org.apache.sling.distribution.journal.kafka"),
+
+ // distribution bundles
+ slingDistribution(),
+
+ // testing
+ //slingResourcePresence(), // see https://github.com/apache/sling-org-apache-sling-resource-presence
+ jsoup(),
+ myJunitBundles()
+
+ );
+
+ // Remote debugger on the forked JVM
+ // Run with mvn install -DisDebugEnabled=true
+ return getBoolean("isDebugEnabled")
+ ? composite(remoteDebug(), baseOptions)
+ : baseOptions;
+ }
+
+ public static CompositeOption myJunitBundles() {
+ return new DefaultCompositeOption(
+ composite(defaultTestSystemOptions()),
+ new JUnitBundlesOption(),
+ systemProperty("pax.exam.invoker").value("junit"),
+ mvn("org.mockito", "mockito-all"),
+ mvn("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.hamcrest"),
+ mvn("org.awaitility", "awaitility"),
+ bundle("link:classpath:META-INF/links/org.ops4j.pax.exam.invoker.junit.link"));
+ }
+
+ /**
+ * Dependencies for mockito 2
+ */
+ public static Option mockito2() {
+ return composite(
+ mvn("org.objenesis", "objenesis"),
+ mvn("net.bytebuddy", "byte-buddy"),
+ mvn("net.bytebuddy", "byte-buddy-agent"),
+ mvn("org.mockito", "mockito-core")
+ );
+ }
+
+ /**
+ * Standard test system options with just at inject removed as it collides with
+ * the one provided in sling
+ */
+ private static Option[] defaultTestSystemOptions() {
+ ConfigurationManager cm = new ConfigurationManager();
+ String logging = cm.getProperty(Constants.EXAM_LOGGING_KEY,
+ Constants.EXAM_LOGGING_PAX_LOGGING);
+
+ return new Option[] {
+ bootDelegationPackage("sun.*"),
+ frameworkStartLevel(Constants.START_LEVEL_TEST_BUNDLE),
+ url("link:classpath:META-INF/links/org.ops4j.pax.exam.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.ops4j.pax.exam.inject.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.ops4j.pax.extender.service.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.osgi.compendium.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+
+ when(logging.equals(Constants.EXAM_LOGGING_PAX_LOGGING)).useOptions(
+ url("link:classpath:META-INF/links/org.ops4j.pax.logging.api.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES)),
+
+ url("link:classpath:META-INF/links/org.ops4j.base.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.core.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.extender.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.framework.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.lifecycle.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.tracker.link").startLevel(
+ START_LEVEL_SYSTEM_BUNDLES),
+ /*
+ url("link:classpath:META-INF/links/org.apache.geronimo.specs.atinject.link")
+ .startLevel(START_LEVEL_SYSTEM_BUNDLES) };
+ */
+ };
+ }
+
+ protected Option slingQuickstart(String baseDirectory) {
+ final String workingDirectory = String.format("%s/instance", baseDirectory);
+ final String dataStoreDirectory = String.format("%s/shareddatastore", baseDirectory);
+ System.out.println(String.format("Quickstart with workingDirectory %s, port %s", workingDirectory, httpPort));
+ return slingQuickstartOakSharedBlobstore(workingDirectory, dataStoreDirectory, httpPort);
+ }
+
+ public static Option defaultOsgiConfigs() {
+ return defaultOsgiConfigs("");
+ }
+
+ /**
+ * OSGI configurations targeted to author and publish instances
+ */
+ protected static Option defaultOsgiConfigs(String journalEndpoint) {
+ return composite(
+ newConfiguration("org.apache.sling.jcr.resource.internal.JcrSystemUserValidator")
+ .put("allow.only.system.user", false).asOption(),
+
+ // For production the users would be: replication-service,content-writer-service
+ factoryConfiguration("org.apache.sling.serviceusermapping.impl.ServiceUserMapperImpl.amended")
+ .put("user.mapping", new String[]{"org.apache.sling.distribution.journal:bookkeeper=admin","org.apache.sling.distribution.journal:importer=admin"})
+ .asOption(),
+
+ factoryConfiguration(VaultDistributionPackageBuilderFactory.class.getName())
+ .put("name", "journal")
+ .put("type", "inmemory")
+ .put("useBinaryReferences", "true")
+ .put("aclHandling", "IGNORE")
+ .put("package.filters", new String[]{"/home/users|-.*/.tokens", "/home/users|-.*/rep:cache"})
+ .put("property.filters", new String[]{"/|-^.*/cq:lastReplicated|-^.*/cq:lastReplicatedBy|-^.*/cq:lastReplicationAction"})
+ .asOption(),
+
+ newConfiguration("org.apache.sling.distribution.journal.kafka.KafkaClientProvider")
+ .asOption(),
+
+ newConfiguration("org.apache.sling.distribution.component.impl.DistributionComponentFactoryMap")
+ .put("mapping.agent", new String[]{//
+ "pub:org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"})
+ .asOption(),
+
+ newConfiguration("org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker")
+ .put("scheduler.period", 1L)
+ .asOption()
+
+ );
+
+ }
+
+ /**
+ * OSGI configurations targeted to the author instances only
+ */
+ public static Option authorOsgiConfigs() {
+ return composite(
+ factoryConfiguration("org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory")
+ .put("name", "agent1")
+ .put("packageBuilder.target", "(name=journal)")
+ .asOption(),
+ factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+ .put("kind", "agent")
+ .put("provider.roots", "/libs/sling/distribution/services/agents")
+ .asOption(),
+ factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+ .put("kind", "exporter")
+ .put("provider.roots", "/libs/sling/distribution/services/exporters")
+ .asOption(),
+ factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+ .put("kind", "importer")
+ .put("provider.roots", "/libs/sling/distribution/services/importers")
+ .asOption()
+ );
+
+ }
+
+ protected static Option publishOsgiConfigs() {
+ return publishOsgiConfigs("subscriber-agent1");
+ }
+
+ /**
+ * OSGI configuration targeted to the publish instances only
+ */
+ public static Option publishOsgiConfigs(String agentName) {
+ return publishOsgiConfigs(agentName, true, null);
+
+ }
+
+ protected static Option publishOsgiConfigs(String agentName, boolean editable, String stage) {
+
+
+ Option subConfig = composite(
+ factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+ .put("kind", "agent")
+ .put("provider.roots", "/libs/sling/distribution/services/agents")
+ .asOption(),
+
+ factoryConfiguration("org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+ .put("name", agentName)
+ .put("agentNames", new String[]{"agent1"})
+ .put("packageBuilder.target", "(name=journal)")
+ .put("precondition.target", stage != null ? "(name=staging)" : "(name=default)")
+ .put("editable", editable)
+ .put("announceDelay", "500")
+ .asOption());
+
+ Option condConfig = newConfiguration("org.apache.sling.distribution.journal.impl.subscriber.StagingPrecondition")
+ .put("subAgentName", stage)
+ .asOption();
+
+ return stage != null ? composite(subConfig, condConfig) : subConfig;
+
+ }
+
+
+ protected static Option remoteDebug() {
+ return remoteDebug(5005);
+ }
+
+ protected static Option remoteDebug(int debugPort) {
+ System.out.println(String.format("Remote debugger on port: %s", debugPort));
+ return vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005");
+ }
+
+ private static Option slingQuickstartOakSharedBlobstore(String workingDirectory, String dataStoreDirectory, int httpPort) {
+
+ // Option for a quickstart running an Oak repository with a nodestore and a shared data store.
+
+ String slingHome = String.format("%s/sling", workingDirectory);
+ String repositoryHome = String.format("%s/repository", slingHome);
+ String localIndexDir = String.format("%s/index", repositoryHome);
+ return composite(
+
+ slingQuickstartOak(),
+
+ mavenBundle()
+ .groupId("org.apache.jackrabbit")
+ .artifactId("oak-lucene")
+ .version(SlingOptions.versionResolver),
+
+ mavenBundle()
+ .groupId("org.apache.jackrabbit")
+ .artifactId("oak-segment-tar")
+ .version(SlingOptions.versionResolver),
+
+ mavenBundle()
+ .groupId("org.apache.sling")
+ .artifactId("org.apache.sling.jcr.oak.server")
+ .version(SlingOptions.versionResolver),
+
+ newConfiguration("org.apache.felix.http")
+ .put("org.osgi.service.http.port", httpPort).asOption(),
+
+ newConfiguration("org.apache.jackrabbit.oak.segment.SegmentNodeStoreService")
+ .put("customBlobStore", true)
+ .put("repository.home", repositoryHome)
+ .put("name", "NodeStore with custom blob store").asOption(),
+
+ newConfiguration("org.apache.jackrabbit.oak.plugins.blob.datastore.FileDataStore")
+ .put("path", dataStoreDirectory)
+ .put("minRecordLength", 16384).asOption(),
+
+ newConfiguration("org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexProviderService")
+ .put("localIndexDir", localIndexDir).asOption()
+ );
+ }
+
+ private static Option kafka() {
+ return composite(
+ mvn("com.fasterxml.jackson.core", "jackson-core"),
+ mvn("com.fasterxml.jackson.core", "jackson-annotations"),
+ mvn("com.fasterxml.jackson.core", "jackson-databind"),
+ mvn("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.kafka-clients")
+ );
+ }
+
+ public static Option mvn(String groupId, String artifactId) {
+ return mavenBundle().groupId(groupId).artifactId(artifactId).versionAsInProject();
+ }
+
+ private static Option jsoup() {
+ return mavenBundle().groupId("org.jsoup").artifactId("jsoup").versionAsInProject();
+ }
+
+ public static void createTopic(String topicName) {
+ NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
+ try (AdminClient admin = create(singletonMap(BOOTSTRAP_SERVERS_CONFIG,
+ standardConverter().convert(kafkaProperties()).to(KafkaEndpoint.class).kafkaBootstrapServers()))) {
+ CreateTopicsResult result = admin.createTopics(singletonList(newTopic));
+ result.values().get(topicName).get();
+ } catch (Exception e) {
+ throw new RuntimeException(format("Failed to create topic %s", topicName), e);
+ }
+ }
+
+ public static void createTopics() {
+ createTopic(TOPIC_DISCOVERY);
+ createTopic(TOPIC_PACKAGE);
+ createTopic(TOPIC_COMMAND);
+ createTopic(TOPIC_STATUS);
+ createTopic(TOPIC_EVENT);
+ }
+
+ public static MessagingProvider createProvider() {
+ KafkaClientProvider provider = new KafkaClientProvider();
+ provider.activate(standardConverter().convert(kafkaProperties()).to(KafkaEndpoint.class));
+ return provider;
+ }
+
+ private static Map<String,String> kafkaProperties() {
+ return ImmutableMap.of(
+ "kafkaDefaultApiTimeout", "5000",
+ "kafkaConnectTimeout", "32000");
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/FileUtil.java b/src/test/java/org/apache/sling/distribution/journal/it/FileUtil.java
new file mode 100644
index 0000000..a40b327
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/FileUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+
+public class FileUtil {
+ private FileUtil() {
+ }
+
+ public static void copyFolder(Path src, Path dest) throws IOException{
+ Files.walk(src).forEach(source -> copy(source, dest.resolve(src.relativize(source))));
+ }
+
+ private static void copy(Path source, Path dest) {
+ try {
+ Files.copy(source, dest, REPLACE_EXISTING);
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+
+ public static void deleteDir(File dir) {
+ try {
+ FileUtils.deleteDirectory(dir);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ext/AfterOsgi.java b/src/test/java/org/apache/sling/distribution/journal/it/ext/AfterOsgi.java
new file mode 100644
index 0000000..da980b4
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ext/AfterOsgi.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.ext;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AfterOsgi {
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ext/BeforeOsgi.java b/src/test/java/org/apache/sling/distribution/journal/it/ext/BeforeOsgi.java
new file mode 100644
index 0000000..c504cd2
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ext/BeforeOsgi.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.ext;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface BeforeOsgi {
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ext/ExtPaxExam.java b/src/test/java/org/apache/sling/distribution/journal/it/ext/ExtPaxExam.java
new file mode 100644
index 0000000..33603d3
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ext/ExtPaxExam.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.ext;
+
+import static java.util.Arrays.asList;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Optional;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.model.InitializationError;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+/**
+ * Shows how to extend the PaxExam runner to implement hooks that are executed outside of OSGi in the parent
+ * junit process
+ */
+public class ExtPaxExam extends PaxExam {
+
+ private Optional<Method> before;
+ private Optional<Method> after;
+
+ public ExtPaxExam(Class<?> klass) throws InitializationError {
+ super(klass);
+ this.before = getStaticMethodWith(klass, BeforeOsgi.class);
+ this.after = getStaticMethodWith(klass, AfterOsgi.class);
+ }
+
+ @Override
+ public void run(RunNotifier notifier) {
+ /**
+ * If a test is run more than once from the ide the by default the pax exam dir is not deleted.
+ * This causes bundles to be installed multiple times. Deleting the pax exam dir to avoid this.
+ */
+ deleteDir(new File("target/paxexam"));
+ this.before.ifPresent(this::invoke);
+ try {
+ super.run(notifier);
+ } finally {
+ this.after.ifPresent(this::invoke);
+ }
+ }
+
+ private void deleteDir(File dir) {
+ try {
+ FileUtils.deleteDirectory(dir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Optional<Method> getStaticMethodWith(Class<?> klass, Class<? extends Annotation> annotation) {
+ Optional<Method> foundMethod = asList(klass.getMethods()).stream()
+ .filter(method -> method.getAnnotation(annotation) != null)
+ .findFirst();
+ if (foundMethod.isPresent()) {
+ Method m = foundMethod.get();
+ if (!Modifier.isStatic(m.getModifiers())) {
+ throw new IllegalStateException("Method " + m.getName() + " must be static to be used as " + annotation.getName());
+ }
+ }
+ return foundMethod;
+ }
+
+ private void invoke(Method method) {
+ try {
+ method.invoke(null);
+ } catch (Exception e) {
+ throw new RuntimeException("Error calling method " + method, e);
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaLocal.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaLocal.java
new file mode 100644
index 0000000..658f075
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaLocal.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.VerifiableProperties;
+import scala.Some;
+import scala.collection.Seq;
+
+public class KafkaLocal implements Closeable {
+ Logger LOG = LoggerFactory.getLogger(KafkaLocal.class);
+
+ public KafkaServer kafka;
+ public ZooKeeperLocal zookeeper;
+
+ public KafkaLocal() throws Exception {
+ this(kafkaProperties(), zookeeperProperties());
+ }
+
+ public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws Exception {
+ zookeeper = new ZooKeeperLocal(zkProperties);
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+ Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(kafkaProperties));
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, new Some<String>("kafka"), reporters);
+ kafka.startup();
+ }
+
+ @Override
+ public void close() throws IOException {
+ System.out.println("stopping kafka...");
+ try {
+ kafka.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ zookeeper.close();
+ System.out.println("done");
+ }
+
+ private static Properties kafkaProperties() {
+ String logDir = "target/kafka-" + UUID.randomUUID().toString();
+ Properties kafkaProps = new Properties();
+ kafkaProps.put("zookeeper.connect", "localhost:2181");
+ kafkaProps.put("advertised.host.name", "localhost");
+ kafkaProps.put("host.name","localhost");
+ kafkaProps.put("port", "9092");
+ kafkaProps.put("auto.create.topics.enable", false);
+ kafkaProps.put("broker.id", "0");
+ kafkaProps.put("log.dir",logDir);
+ kafkaProps.put("group.initial.rebalance.delay.ms", "0");
+ kafkaProps.put("group.min.session.timeout.ms", "1000");
+ return kafkaProps;
+ }
+
+ private static Properties zookeeperProperties() {
+ Properties zkProps = new Properties();
+ UUID uuid = UUID.randomUUID();
+ zkProps.put("dataDir", "target/zookeeper/"+uuid.toString());
+ zkProps.put("clientPort", "2181");
+ return zkProps;
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaRule.java
new file mode 100644
index 0000000..afc1291
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import java.io.IOException;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class KafkaRule implements TestRule {
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ return new Statement() {
+
+ @Override
+ public void evaluate() throws Throwable {
+ runWithKafka(base);
+ }
+ };
+ }
+
+ private void runWithKafka(Statement base) throws Throwable, IOException, Exception {
+ try (KafkaLocal kafka = new KafkaLocal()) {
+ base.evaluate();
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/PaxExamWithKafka.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/PaxExamWithKafka.java
new file mode 100644
index 0000000..40c6365
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/PaxExamWithKafka.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.model.InitializationError;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaxExamWithKafka extends PaxExam {
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ public PaxExamWithKafka(Class<?> klass) throws InitializationError {
+ super(klass);
+ }
+
+ @Override
+ public void run(RunNotifier notifier) {
+ try (KafkaLocal kafka = new KafkaLocal()){
+ DistributionTestSupport.createTopics();
+ super.run(notifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/ZooKeeperLocal.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/ZooKeeperLocal.java
new file mode 100644
index 0000000..98a9cf4
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/ZooKeeperLocal.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperLocal implements Closeable {
+ static Logger LOG = LoggerFactory.getLogger(ZooKeeperLocal.class);
+ MyZooKeeperServerMain zooKeeperServer;
+
+ public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException, ConfigException {
+ QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+ quorumConfiguration.parseProperties(zkProperties);
+ zooKeeperServer = new MyZooKeeperServerMain(quorumConfiguration);
+
+ new Thread() {
+ public void run() {
+ try {
+ zooKeeperServer.startup();
+ } catch (IOException e) {
+ System.out.println("ZooKeeper Failed");
+ e.printStackTrace(System.err);
+ }
+ }
+ }.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ zooKeeperServer.shutdown();
+ }
+
+ static class MyZooKeeperServerMain extends ZooKeeperServerMain {
+
+ private QuorumPeerConfig config;
+
+ MyZooKeeperServerMain(QuorumPeerConfig config) {
+ this.config = config;
+ }
+
+ public void startup() throws IOException {
+ ServerConfig serverConfig = new ServerConfig();
+ serverConfig.readFrom(config);
+ runFromConfig(serverConfig);
+ }
+
+ public void shutdown() {
+ try {
+ super.shutdown();
+ } catch (Exception e) {
+ LOG.error("Error shutting down ZooKeeper", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/Author2PublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/Author2PublisherTest.java
new file mode 100644
index 0000000..7dc5f95
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/Author2PublisherTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.ops4j.pax.exam.util.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+
+/**
+ * Starts one author instance and two publisher instances.
+ * The author instance also runs the test code.
+ * The test triggers a content distribution and checks it is correctly processed by
+ * both publish instances.
+ */
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class Author2PublisherTest extends DistributionTestSupport {
+ private static final String PUB1_AGENT = "agent1";
+ private static final String SUB1_AGENT = "subscriber-agent1";
+ private static final String SUB2_AGENT = "subscriber-agent2";
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Inject
+ @Filter(value = "(name=agent1)", timeout = 40000L)
+ DistributionAgent agent;
+
+ @Inject
+ @Filter
+ Distributor distributor;
+
+ @Inject
+ @Filter
+ ResourceResolverFactory resourceResolverFactory;
+
+ @Inject
+ MessagingProvider clientProvider;
+
+ private static TestContainer publisher;
+
+ private static TestContainer publisher2;
+ private static KafkaLocal kafka;
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[] { //
+ //debug(),
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ baseConfiguration(), //
+ defaultOsgiConfigs(), //
+ authorOsgiConfigs() //
+ };
+ }
+
+ public Option debug() {
+ return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ }
+
+ @BeforeOsgi
+ public static void startPublishers() throws Exception {
+ kafka = new KafkaLocal();
+ DistributionTestSupport.createTopics();
+ publisher = startPublisher(publisherConfig(8182, "Author2PublisherTest.publisher1", SUB1_AGENT));
+ publisher2 = startPublisher(publisherConfig(8183, "Author2PublisherTest.publisher2", SUB2_AGENT));
+ }
+
+ private static TestContainer startPublisher(Option[] config) {
+ ExamSystem testSystem;
+ try {
+ testSystem = PaxExamRuntime.createTestSystem(config);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ TestContainer container = PaxExamRuntime.createContainer(testSystem);
+ container.start();
+ //new Thread(container::start).start();
+ return container;
+ }
+
+ @AfterOsgi
+ public static void stopPublishers() throws IOException {
+ if (publisher != null) {
+ publisher.stop();
+ }
+ if (publisher2 != null) {
+ publisher2.stop();
+ }
+ closeQuietly(kafka);
+ }
+
+ private static Option[] publisherConfig(int httpPort, String instanceName, String agentName) {
+ String workdir = String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), instanceName);
+ return CoreOptions.options( //
+ new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
+ defaultOsgiConfigs(), //
+ publishOsgiConfigs(agentName), //
+ CoreOptions.workingDirectory(workdir)
+ );
+ }
+
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ @Test
+ public void testDistribute() throws Exception {
+ Map<String, Object> authinfo = new HashMap<String, Object>();
+ try (ResourceResolver resolver = resourceResolverFactory.getAdministrativeResourceResolver(authinfo)) {
+ await().until(() -> distribute(resolver), equalTo(true));
+ }
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(this::queueNames, containsInAnyOrder(containsString(SUB1_AGENT), containsString(SUB2_AGENT)));
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(this::allQueuesEmpty, equalTo(true));
+ System.out.println("Queuenames " + agent.getQueueNames());
+ }
+
+ private boolean distribute(ResourceResolver resolver) {
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
+ DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+ log.info(response.getMessage());
+ return response.isSuccessful();
+ }
+
+ private List<String> queueNames() {
+ List<String> queueNames = new ArrayList<>();
+ agent.getQueueNames().forEach(queueNames::add);
+ return queueNames;
+ }
+
+ private boolean allQueuesEmpty() {
+ return queueNames().stream().allMatch(this::queueEmpty);
+ }
+
+ private boolean queueEmpty(String queueName) {
+ return agent.getQueue(queueName).getStatus().getItemsCount() == 0;
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
new file mode 100644
index 0000000..c45b199
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Starts an author instance, triggers a content distribution and checks that the package arrives
+ * on the journal.
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class AuthorDistributeTest extends DistributionTestSupport {
+ private static final String PUB1_AGENT = "agent1";
+ private static final String SUB1_SLING_ID = UUID.randomUUID().toString();
+ private static final String SUB1_AGENT = "sub1agent";
+ private static final String QUEUE_NAME = SUB1_SLING_ID + "-" + SUB1_AGENT;
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Inject
+ @Filter(value = "(name=agent1)", timeout = 40000L)
+ DistributionAgent agent;
+
+ @Inject
+ @Filter
+ Distributor distributor;
+
+ @Inject
+ @Filter
+ ResourceResolverFactory resourceResolverFactory;
+
+ @Inject
+ MessagingProvider clientProvider;
+
+ private AtomicReference<PackageMessage> recordedPackage = new AtomicReference<PackageMessage>();
+
+ private Semaphore messageSem = new Semaphore(0);
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[] { //
+ //debug(),
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ baseConfiguration(), //
+ defaultOsgiConfigs(), //
+ authorOsgiConfigs() //
+ };
+ }
+
+ @Test
+ public void testDistribute() throws Exception {
+ await().until(this::distribute);
+
+ try (Closeable packagePoller = createPoller()) {
+ assertPackageReceived();
+ }
+
+ simulateDiscoveryMessage();
+ await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.singleton(QUEUE_NAME)));
+ assertThat(agent.getQueue(QUEUE_NAME).getStatus().getItemsCount(), equalTo(1));
+ }
+
+ @SuppressWarnings("deprecation")
+ private boolean distribute() throws LoginException {
+ Map<String, Object> authinfo = new HashMap<String, Object>();
+ try (ResourceResolver resolver = resourceResolverFactory.getAdministrativeResourceResolver(authinfo)) {
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
+ DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+ log.info(response.getMessage());
+ return response.isSuccessful();
+ }
+ }
+
+ private Closeable createPoller() {
+ HandlerAdapter<PackageMessage> adapter = create(PackageMessage.class, this::handle);
+ return clientProvider.createPoller(TOPIC_PACKAGE, Reset.earliest, adapter);
+ }
+
+ private void assertPackageReceived() throws InterruptedException {
+ assertTrue(messageSem.tryAcquire(10, TimeUnit.SECONDS));
+ PackageMessage pkg = recordedPackage.get();
+ assertEquals(PackageMessage.ReqType.ADD, pkg.getReqType());
+ String path = pkg.getPathsList().iterator().next();
+ assertEquals("/", path);
+ }
+
+ private void simulateDiscoveryMessage() throws InterruptedException {
+ assertEquals(Collections.emptySet(), toSet(agent.getQueueNames()));
+ MessageSender<DiscoveryMessage> discSender = clientProvider.createSender();
+ DiscoveryMessage disc = createDiscoveryMessage(0);
+ discSender.send(TOPIC_DISCOVERY, disc);
+ }
+
+ private DiscoveryMessage createDiscoveryMessage(long offset) {
+ SubscriberState subState = SubscriberState.newBuilder()
+ .setOffset(offset)
+ .setPubAgentName(PUB1_AGENT)
+ .build();
+ return DiscoveryMessage.newBuilder()
+ .setSubSlingId(SUB1_SLING_ID)
+ .setSubAgentName(SUB1_AGENT)
+ .setSubscriberConfiguration(SubscriberConfiguration
+ .newBuilder()
+ .setEditable(false)
+ .setMaxRetries(-1)
+ .build())
+ .addSubscriberState(subState)
+ .build();
+ }
+
+ private static <T> Set<T> toSet(final Iterable<T> iterable) {
+ return StreamSupport.stream(iterable.spliterator(), false)
+ .collect(Collectors.toSet());
+ }
+
+ void handle(MessageInfo info, PackageMessage message) {
+ if (message.getReqType() == ReqType.ADD) {
+ recordedPackage.set(message);
+ messageSem.release();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
new file mode 100644
index 0000000..f0911d7
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Starts an author instance, triggers a content distribution and checks that the package arrives
+ * on the journal.
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class AuthorRestartTest extends DistributionTestSupport {
+ private static final String PUB1_AGENT = "agent1";
+ private static final String SUB1_SLING_ID = UUID.randomUUID().toString();
+ private static final String SUB1_AGENT = "sub1agent";
+ private static final String QUEUE_NAME = SUB1_SLING_ID + "-" + SUB1_AGENT;
+ private static final int NUM_MESSAGES = 2000;
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Inject
+ @Filter(value = "(name=agent1)", timeout = 40000L)
+ private DistributionAgent agent;
+
+ @Inject
+ @Filter
+ private ResourceResolverFactory resourceResolverFactory;
+
+ @Inject
+ private MessagingProvider clientProvider;
+
+ private AtomicReference<PackageMessage> recordedPackage = new AtomicReference<PackageMessage>();
+ private Semaphore messageSem = new Semaphore(0);
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[] { //
+ //debug(),
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ baseConfiguration(), //
+ defaultOsgiConfigs(), //
+ authorOsgiConfigs() //
+ };
+ }
+
+ @Test
+ public void testRestartWithExisingMessages() throws Exception {
+ for (int c = 0; c < NUM_MESSAGES; c++) {
+ if (c % 100 == 0) {
+ log.info("Sending message {}", c);
+ }
+ PackageMessage packageMessage = createPackageMessage(c);
+ clientProvider.createSender().send(TOPIC_PACKAGE, packageMessage);
+ }
+ try (Closeable packagePoller = createPoller()) {
+ messageSem.tryAcquire(NUM_MESSAGES, 100, TimeUnit.SECONDS);
+ }
+ await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.emptySet()));
+ DiscoveryMessage disc = createDiscoveryMessage(0);
+ clientProvider.createSender().send(TOPIC_DISCOVERY, disc);
+ await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.singleton(QUEUE_NAME)));
+
+ log.info("Checking Items in queue");
+ for (int c=0; c<20; c++) {
+ int itemsCount = agent.getQueue(QUEUE_NAME).getStatus().getItemsCount();
+ log.info("Items in queue: {}", itemsCount);
+ assertThat(itemsCount, equalTo(NUM_MESSAGES));
+ Thread.sleep(100);
+ }
+ }
+
+ private Closeable createPoller() {
+ HandlerAdapter<PackageMessage> adapter = create(PackageMessage.class, this::handle);
+ return clientProvider.createPoller(TOPIC_PACKAGE, Reset.earliest, adapter);
+ }
+
+ private DiscoveryMessage createDiscoveryMessage(long offset) {
+ SubscriberState subState = SubscriberState.newBuilder()
+ .setOffset(offset)
+ .setPubAgentName(PUB1_AGENT)
+ .build();
+ return DiscoveryMessage.newBuilder()
+ .setSubSlingId(SUB1_SLING_ID)
+ .setSubAgentName(SUB1_AGENT)
+ .setSubscriberConfiguration(SubscriberConfiguration
+ .newBuilder()
+ .setEditable(false)
+ .setMaxRetries(-1)
+ .build())
+ .addSubscriberState(subState)
+ .build();
+ }
+
+ private <T> Set<T> toSet(final Iterable<T> iterable) {
+ return StreamSupport.stream(iterable.spliterator(), false)
+ .collect(Collectors.toSet());
+ }
+
+ private PackageMessage createPackageMessage(int num) throws IOException {
+ return PackageMessage.newBuilder()
+ .setPkgId("myid" + num)
+ .setPubSlingId("pub1sling")
+ .setPubAgentName(PUB1_AGENT)
+ .setPkgType("journal")
+ .setReqType(PackageMessage.ReqType.ADD)
+ .addAllPaths(Arrays.asList("/test"))
+ .setPkgBinary(ByteString.copyFrom(new byte[100]))
+ .build();
+ }
+
+ private void handle(MessageInfo info, PackageMessage message) {
+ if (message.getReqType() == ReqType.ADD) {
+ recordedPackage.set(message);
+ messageSem.release();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
new file mode 100644
index 0000000..a918caa
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+
+import com.google.protobuf.ByteString;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.ops4j.pax.exam.util.PathUtils;
+
+import static org.apache.sling.distribution.journal.messages.Messages.*;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+/**
+ * Starts one author instance and one publisher instances.
+ * The author instance also runs the test code.
+ * The test sends 10 invalid distribution package that fails importing on the publisher.
+ * The test sends a clear command for the first 1 entry and checks that the
+ * package was removed from the queue.
+ * The test sends a clear command for all entries and checks that the queue is cleared.
+ *
+ * The test also covers the remove interface while the REMOVABLE capability is supported.
+ */
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class ClearQueueItemTest extends DistributionTestSupport {
+
+ private static final String PUB1_AGENT = "agent1";
+ private static final String SUB1_AGENT = "subscriber-agent1";
+
+ @Inject
+ @Filter(value = "(name=agent1)", timeout = 40000L)
+ DistributionAgent agent;
+
+ @Inject
+ MessagingProvider clientProvider;
+
+ private static TestContainer publisher;
+ private static KafkaLocal kafka;
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[] { //
+ //debug(),
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ baseConfiguration(), //
+ defaultOsgiConfigs(), //
+ authorOsgiConfigs() //
+ };
+ }
+
+ public Option debug() {
+ return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ }
+
+ @BeforeOsgi
+ public static void startPublisher() throws Exception {
+ kafka = new KafkaLocal();
+ DistributionTestSupport.createTopics();
+ publisher = startPublisher(publisherConfig(8182, "RemoveQueueItemTest.publisher1", SUB1_AGENT));
+ }
+
+ private static TestContainer startPublisher(Option[] config) {
+ ExamSystem testSystem;
+ try {
+ testSystem = PaxExamRuntime.createTestSystem(config);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ TestContainer container = PaxExamRuntime.createContainer(testSystem);
+ container.start();
+ return container;
+ }
+
+ @AfterOsgi
+ public static void stopPublishers() throws IOException {
+ if (publisher != null) {
+ publisher.stop();
+ }
+ closeQuietly(kafka);
+ }
+
+ private static Option[] publisherConfig(int httpPort, String instanceName, String agentName) {
+ String workdir = String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), instanceName);
+ return CoreOptions.options( //
+ new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
+ defaultOsgiConfigs(), //
+ publishOsgiConfigs(agentName), //
+ CoreOptions.workingDirectory(workdir)
+ );
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testClearItems() throws Exception {
+
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(this::queueNames, containsInAnyOrder(containsString(SUB1_AGENT)));
+
+ String subAgent1QueueName = agent.getQueueNames().iterator().next();
+
+ testClearMethod(subAgent1QueueName);
+ testRemoveMethod(subAgent1QueueName);
+
+ }
+
+ private void testClearMethod(String subAgent1QueueName)
+ throws Exception {
+
+ sendInvalidPackages(10);
+
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> queueSize(subAgent1QueueName, 10));
+
+ String headEntryId1 = agent.getQueue(subAgent1QueueName).getHead().getId();
+
+ agent.getQueue(subAgent1QueueName).clear(1);
+
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> queueSize(subAgent1QueueName, 9));
+
+ String headEntryId2 = agent.getQueue(subAgent1QueueName).getHead().getId();
+ Assert.assertNotEquals(headEntryId1, headEntryId2);
+
+ agent.getQueue(subAgent1QueueName).clear(-1);
+
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> queueEmpty(subAgent1QueueName));
+ }
+
+ private void testRemoveMethod(String subAgent1QueueName)
+ throws Exception {
+
+ /*
+ * This test must be removed when the
+ * REMOVABLE capability is removed
+ */
+
+ sendInvalidPackages(10);
+
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> queueSize(subAgent1QueueName, 10));
+
+ String headEntryId1 = agent.getQueue(subAgent1QueueName).getHead().getId();
+
+ agent.getQueue(subAgent1QueueName).remove(headEntryId1);
+
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> queueSize(subAgent1QueueName, 9));
+
+ String headEntryId2 = agent.getQueue(subAgent1QueueName).getHead().getId();
+ Assert.assertNotEquals(headEntryId1, headEntryId2);
+
+ String tailEntry = lastEntry(agent.getQueue(subAgent1QueueName).getEntries(0, -1)).getId();
+ Set<String> entryIds = new HashSet<>();
+ entryIds.add(tailEntry);
+ entryIds.add(headEntryId2);
+
+ agent.getQueue(subAgent1QueueName).remove(entryIds);
+
+ await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> queueEmpty(subAgent1QueueName));
+ }
+
+ private List<String> queueNames() {
+ List<String> queueNames = new ArrayList<>();
+ agent.getQueueNames().forEach(queueNames::add);
+ return queueNames;
+ }
+
+ private boolean queueEmpty(String queueName) {
+ return agent.getQueue(queueName).getStatus().isEmpty();
+ }
+
+ private boolean queueSize(String queueName, int expectedSize) {
+ return agent.getQueue(queueName).getStatus().getItemsCount() == expectedSize;
+ }
+
+ private DistributionQueueEntry lastEntry(Iterable<DistributionQueueEntry> entries) {
+ Iterator<DistributionQueueEntry> iterator = entries.iterator();
+ DistributionQueueEntry last = null;
+ for (; iterator.hasNext() ; ) {
+ last = iterator.next();
+ }
+ return last;
+ }
+
+ private void sendInvalidPackages(int nb)
+ throws Exception {
+ MessageSender<PackageMessage> sender = clientProvider.createSender();
+ for (int i = 0 ; i < nb ; i++) {
+ sender.send(TOPIC_PACKAGE, newInvalidPackage(PUB1_AGENT));
+ }
+ }
+
+ private PackageMessage newInvalidPackage(String agentId) throws IOException {
+
+ final byte[] pkgBinary = new byte[2048];
+ new Random().nextBytes(pkgBinary);
+ final List<String> paths = Collections.singletonList("/content/invalid");
+ final List<String> deepPaths = Collections.emptyList();
+ final String pkgId = String.format("package-%s", UUID.randomUUID().toString());
+
+ return PackageMessage.newBuilder()
+ .setPubSlingId("slingid")
+ .setPkgId(pkgId)
+ .setPubAgentName(agentId)
+ .setPkgBinary(ByteString.copyFrom(pkgBinary))
+ .setPkgType("journal")
+ .addAllPaths(paths)
+ .setReqType(PackageMessage.ReqType.ADD)
+ .addAllDeepPaths(deepPaths)
+ .setPkgLength(pkgBinary.length)
+ .build();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/JournalAvailableTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/JournalAvailableTest.java
new file mode 100644
index 0000000..6e8f842
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/JournalAvailableTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * Check that JournalAvailableChecker correctly tests for presence of the journal and topics
+ */
+public class JournalAvailableTest {
+
+ @Spy
+ private MessagingProvider provider;
+
+ @Spy
+ Topics topics;
+
+ @InjectMocks
+ JournalAvailableChecker checker;
+
+ @Mock
+ private BundleContext context;
+
+ private KafkaLocal kafka;
+
+ @Before
+ public void before() throws Exception {
+ kafka = new KafkaLocal();
+ DistributionTestSupport.createTopics();
+ this.provider = DistributionTestSupport.createProvider();
+ MockitoAnnotations.initMocks(this);
+ topics.activate(topicsConfiguration(singletonMap("packageTopic", "topic_does_not_exist")));
+ }
+
+ @After
+ public void after() {
+ closeQuietly(kafka);
+ }
+
+ @Test
+ public void test() throws Exception {
+ mockServiceReg();
+ Callable<Boolean> isAvailable = () -> { checker.run(); return checker.isAvailable(); };
+ checker.activate(context);
+ new Thread(checker).start();
+ // Wait a bit as the journal backend from previous test might still be up
+ await().atMost(30, TimeUnit.SECONDS).until(isAvailable, equalTo(false));
+
+ topics.activate(topicsConfiguration(emptyMap()));
+
+ await().atMost(30, TimeUnit.SECONDS).until(isAvailable, equalTo(true));
+
+ IOUtils.closeQuietly(kafka);
+ await().atMost(60, TimeUnit.SECONDS).until(isAvailable, equalTo(false));
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private void mockServiceReg() {
+ ServiceRegistration<JournalAvailable> reg = Mockito.mock(ServiceRegistration.class);
+ when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.eq(checker), Mockito.isNull(Dictionary.class))).thenReturn(reg);
+ }
+
+ private TopicsConfiguration topicsConfiguration(Map<String,String> props) {
+ return standardConverter()
+ .convert(props)
+ .to(TopicsConfiguration.class);
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/LatePipeAuthorDistributeTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/LatePipeAuthorDistributeTest.java
new file mode 100644
index 0000000..4571e5e
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/LatePipeAuthorDistributeTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assume.assumeTrue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
+
+/**
+ * Test for the journal coming up after the start of an author instance
+ * and the journal being stopped and restarted when author instance already runs.
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class LatePipeAuthorDistributeTest extends DistributionTestSupport {
+ private static final String PUB1_AGENT = "agent1";
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Inject
+ Distributor distributor;
+
+ @Inject
+ ResourceResolverFactory resourceResolverFactory;
+
+ @Inject
+ MessagingProvider clientProvider;
+
+ private ToxiproxyClient proxyClient;
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[] { //
+ //debug(),
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ baseConfiguration(), //
+ mvn("com.google.code.gson", "gson"),
+ mvn("eu.rekawek.toxiproxy", "toxiproxy-java"),
+ defaultOsgiConfigs("http://localhost:8083"), //
+ authorOsgiConfigs() //
+ };
+ }
+
+ public Option debug() {
+ return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ }
+
+ @Before
+ public void before() {
+ proxyClient = new ToxiproxyClient();
+ try {
+ proxyClient.getProxies();
+ deleteProxy();
+ } catch (IOException e) {
+ assumeTrue("Toxiproxy server not present. Ignoring", false);
+ }
+
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testDistribute() throws Exception {
+ Map<String, Object> authinfo = new HashMap<String, Object>();
+ try (ResourceResolver resolver = resourceResolverFactory.getAdministrativeResourceResolver(authinfo)) {
+ await().until( () -> bundleContext.getServiceReference(JournalAvailable.class), nullValue());
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
+
+ // Fail as journal is not present
+ DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+ Assert.assertFalse(response.getMessage(), response.isSuccessful());
+
+ // Succeed as journal is present over proxy
+ proxyClient.createProxy("journal", "localhost:8083", "localhost:8082");
+ log.info("Created proxy for journal");
+ await().until( () -> bundleContext.getServiceReference(JournalAvailable.class), notNullValue());
+ await().atMost(30, SECONDS).until(() -> distributor.distribute(PUB1_AGENT, resolver, request),
+ hasProperty("successful", equalTo(true)));
+
+ // After deleting the proxy distribute fails again
+ deleteProxy();
+ log.info("Deleted proxy for journal");
+ await().atMost(30, TimeUnit.SECONDS).until(() -> distributor.distribute(PUB1_AGENT, resolver, request),
+ hasProperty("successful", equalTo(false)));
+ await().until( () -> bundleContext.getServiceReference(JournalAvailable.class), nullValue());
+ } finally {
+ deleteProxy();
+ }
+ }
+
+ private void deleteProxy() throws IOException {
+ Proxy proxy = proxyClient.getProxyOrNull("journal");
+ if (proxy != null) {
+ proxy.delete();
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
new file mode 100644
index 0000000..a013ed5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Starts a publish instance and checks that it can receive and process a PackageMessage from the journal
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class PublisherReceiveTest extends DistributionTestSupport {
+ private static final String RESOURCE_PATH = "/my";
+
+ private static final String RESOURCE_TYPE = "sling:Folder";
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Inject
+ ResourceResolverFactory resourceResolverFactory;
+
+ @Inject
+ @Filter("(name=journal)")
+ private DistributionPackageBuilder packageBuilder;
+
+ @Inject
+ @Filter("(name=subscriber-agent1)")
+ DistributionAgent subscriber;
+
+ @Inject
+ MessagingProvider provider;
+ /*
+ @Inject
+ ServiceUserMapper serviceUserMapper;
+ */
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[] { //
+ //debug(),
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ baseConfiguration(), //
+ defaultOsgiConfigs(), //
+ publishOsgiConfigs() //
+ };
+ }
+
+ public Option debug() {
+ return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ }
+
+ @Test
+ public void testReceive() throws Exception {
+ Arrays.asList(bundleContext.getBundles()).stream()
+ .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + bundle.getVersion()));
+ DistributionPackage pkg = createDistPackage(RESOURCE_PATH);
+ Messages.PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
+ provider.createSender().send(TOPIC_PACKAGE, pkgMsg);
+ await().until(() -> getResource(RESOURCE_PATH), notNullValue());
+ }
+
+ /**
+ * Create a resource at the given path, build a DistributionPackage from it and delete the resource again.
+ */
+ private DistributionPackage createDistPackage(String path)
+ throws PersistenceException, DistributionException {
+ try (ResourceResolver resolver = createResolver()){
+ Resource myRes = ResourceUtil.getOrCreateResource(resolver, path, RESOURCE_TYPE, RESOURCE_TYPE, true);
+ log.info("Created resource with path " + myRes.getPath());
+ DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, myRes.getPath());
+ DistributionPackage pkg = packageBuilder.createPackage(resolver, request);
+ resolver.delete(myRes);
+ resolver.commit();
+ return pkg;
+ }
+ }
+
+ private Resource getResource(String path) {
+ try (ResourceResolver resolver = createResolver()) {
+ return resolver.getResource(path);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private ResourceResolver createResolver() {
+ try {
+ Map<String, Object> authinfo = new HashMap<String, Object>();
+ return resourceResolverFactory.getAdministrativeResourceResolver(authinfo);
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private PackageMessage toPackageMessage(org.apache.sling.distribution.packaging.DistributionPackage pkg, String agentId) throws IOException {
+ final byte[] pkgBinary = IOUtils.toByteArray(pkg.createInputStream());
+ final DistributionPackageInfo pkgInfo = pkg.getInfo();
+ final List<String> paths = Arrays.asList(pkgInfo.getPaths());
+ final List<String> deepPaths = Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class));
+ final String pkgId = pkg.getId();
+
+ return PackageMessage.newBuilder()
+ .setPubSlingId("slingid")
+ .setPkgId(pkgId)
+ .setPubAgentName(agentId)
+ .setPkgBinary(ByteString.copyFrom(pkgBinary))
+ .setPkgType(pkg.getType())
+ .addAllPaths(paths)
+ .setReqType(PackageMessage.ReqType.ADD)
+ .addAllDeepPaths(deepPaths)
+ .setPkgLength(pkgBinary.length)
+ .build();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
new file mode 100644
index 0000000..44af06a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthenticationException;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.it.ClusterIdCleaner;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.FileUtil;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.util.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * - Start a cluster with 1 author and 1 publisher
+ * - Distribute one package
+ * - Stop and clone publisher
+ * - Start 2 publishers with same repo state
+ * - Distribute another package
+ * - Check both have processed both packages
+ */
+public class ScaleUpTest {
+ private static final String AUTHOR = "CloneInstanceTest.author";
+ private static final String PUBLISHER1 = "CloneInstanceTest.publisher1";
+ private static final String PUBLISHER2 = "CloneInstanceTest.publisher2";
+
+ private static final String SUB1_AGENT = "subscriber1-agent1";
+ private static final String SUB2_AGENT = "subscriber2-agent1";
+
+ private static TestContainer author;
+ private static TestContainer publisher1;
+ private static TestContainer publisher2;
+
+ private static Logger LOG = LoggerFactory.getLogger(ScaleUpTest.class);
+ private MessagingProvider provider;
+ private volatile Semaphore packageReceived = new Semaphore(0);
+ private volatile Semaphore discoveryReceived = new Semaphore(0);
+ private long lastPackageOffset;
+ private Closeable packagePoller;
+ private Closeable discoveryPoller;
+ private volatile Map<String, Long> processedOffsets = new ConcurrentHashMap<>();
+ private KafkaLocal kafka;
+
+ @Before
+ public void before() throws Exception {
+ kafka = new KafkaLocal();
+ DistributionTestSupport.createTopics();
+ this.provider = DistributionTestSupport.createProvider();
+ Topics topics = new Topics();
+ packagePoller = this.provider.createPoller(topics.getPackageTopic(), Reset.earliest, create(PackageMessage.class, this::handle));
+ discoveryPoller = this.provider.createPoller(topics.getDiscoveryTopic(), Reset.earliest, create(DiscoveryMessage.class, this::handleDiscovery));
+ }
+
+ @Ignore
+ @Test
+ public void scaleUp() throws Exception {
+ author = startContainer(authorConfig(8182, AUTHOR));
+ publisher1 = startContainer(publisherConfig(8183, PUBLISHER1, SUB1_AGENT));
+ long offset1 = distribute();
+ await().until(this::numDiscoveredPublishers, equalTo(1));
+ await().atMost(60, SECONDS).until(() -> allProcessed(offset1));
+
+ Path sourceRepo = Paths.get(workDir(PUBLISHER1) + "-repo");
+ Path destRepo = Paths.get(workDir(PUBLISHER2) + "-repo");
+ await().atMost(60, SECONDS).until(() -> sourceRepo.toFile().exists());
+ publisher1.stop();
+ publisher1 = null;
+ this.processedOffsets.clear();
+ clone(sourceRepo, destRepo);
+
+ publisher1 = startContainer(publisherConfig(8183, PUBLISHER1, SUB1_AGENT));
+ publisher2 = startContainer(publisherConfig(8184, PUBLISHER2, SUB2_AGENT));
+ await().until(this::numDiscoveredPublishers, equalTo(2));
+ await().atMost(60, SECONDS).until(() -> allProcessed(offset1));
+ long offset2 = distribute();
+ await().atMost(60, SECONDS).until(() -> allProcessed(offset2));
+ }
+
+ @After
+ public void after() {
+ closeQuietly(packagePoller, discoveryPoller);
+ stopContainer(publisher1);
+ stopContainer(publisher2);
+ stopContainer(author);
+ closeQuietly(kafka);
+ }
+
+ private int numDiscoveredPublishers() {
+ return processedOffsets.keySet().size();
+ }
+
+ private boolean allProcessed(long offset) {
+ boolean allMatch = processedOffsets.values().stream().allMatch(processedOffset -> processedOffset >= offset);
+ return allMatch;
+ }
+
+ private void clone(Path sourceRepo, Path destRepo) throws IOException, InvalidFileStoreVersionException {
+ FileUtil.deleteDir(destRepo.toFile());
+ FileUtil.copyFolder(sourceRepo, destRepo);
+ new ClusterIdCleaner(destRepo.toFile()).deleteClusterId();
+ }
+
+ private void tryAcquire(Semaphore sem) throws InterruptedException {
+ assertTrue(sem.tryAcquire(10, TimeUnit.SECONDS));
+ }
+
+
+
+ private long distribute() throws Exception {
+ CloseableHttpClient client = HttpClients.createDefault();
+ createNode(client, "/content");
+ distributeNode(client, "/content");
+ client.close();
+ tryAcquire(packageReceived);
+ return lastPackageOffset;
+ }
+
+ private void createNode(CloseableHttpClient client, String path)
+ throws UnsupportedEncodingException, IOException, ClientProtocolException {
+ HttpPost httpPost = new HttpPost("http://localhost:8182" + path);
+ httpPost.setEntity(new UrlEncodedFormEntity(asList(new BasicNameValuePair("jcr:primaryType","sling:Folder"))));
+ httpPost.addHeader("content-type", "application/x-www-form-urlencoded");
+ CloseableHttpResponse response = client.execute(httpPost);
+ LOG.info(response.getStatusLine().getReasonPhrase());
+ assertThat(response.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_OK));
+ }
+
+ private void distributeNode(CloseableHttpClient client, String path)
+ throws UnsupportedEncodingException, AuthenticationException, IOException, ClientProtocolException {
+ HttpPost httpPost = new HttpPost("http://localhost:8182/libs/sling/distribution/services/agents/agent1");
+ List<BasicNameValuePair> list = Arrays.asList(new BasicNameValuePair("action", "ADD"), new BasicNameValuePair("path", path));
+ httpPost.setEntity(new UrlEncodedFormEntity(list));
+ addAuthHEader(httpPost);
+ httpPost.addHeader("content-type", "application/x-www-form-urlencoded");
+
+ CloseableHttpResponse response = client.execute(httpPost);
+ LOG.info(response.getStatusLine().getReasonPhrase());
+ assertThat(response.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_ACCEPTED));
+ }
+
+ private void addAuthHEader(HttpPost httpPost) throws AuthenticationException {
+ httpPost.addHeader(new BasicScheme().authenticate(new UsernamePasswordCredentials("admin", "admin"), httpPost, null));
+ }
+
+ private void stopContainer(TestContainer container) {
+ if (container != null) {
+ container.stop();
+ }
+ }
+
+
+ private static TestContainer startContainer(Option[] config) {
+ ExamSystem testSystem;
+ try {
+ testSystem = PaxExamRuntime.createTestSystem(config);
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ TestContainer container = PaxExamRuntime.createContainer(testSystem);
+ container.start();
+ return container;
+ }
+
+ private Option[] authorConfig(int httpPort, String instanceName) {
+ return new Option[] { //
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+ new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(), //
+ DistributionTestSupport.defaultOsgiConfigs(), //
+ DistributionTestSupport.authorOsgiConfigs() //
+ };
+ }
+
+ private Option[] publisherConfig(int httpPort, String instanceName, String agentName) {
+ String workdir = workDir(instanceName);
+ FileUtil.deleteDir(new File(workdir));
+ return CoreOptions.options( //
+ new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir +"-repo"), //
+ DistributionTestSupport.defaultOsgiConfigs(), //
+ DistributionTestSupport.publishOsgiConfigs(agentName), //
+ CoreOptions.workingDirectory(workdir)
+ );
+ }
+
+ private String workDir(String instanceName) {
+ return String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), instanceName);
+ }
+
+ private void handleDiscovery(MessageInfo info, DiscoveryMessage message) {
+ List<SubscriberState> stateList = message.getSubscriberStateList();
+ String slingId = message.getSubSlingId();
+ OptionalLong minOffset = stateList.stream().mapToLong(state -> state.getOffset()).min();
+ LOG.info("DiscoveryMessage slingid {} received {} states {}", slingId, minOffset, stateList);
+ processedOffsets.put(slingId, minOffset.orElseGet(() -> 0l));
+ discoveryReceived .release();
+ }
+
+ private void handle(MessageInfo info, PackageMessage message) {
+ if (message.getReqType() == PackageMessage.ReqType.TEST) {
+ return;
+ }
+ LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPathsList());
+ this.lastPackageOffset = info.getOffset();
+ packageReceived.release();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
new file mode 100644
index 0000000..3e11ddb
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import org.apache.sling.distribution.journal.it.DistributionTestBase;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+
+import java.io.IOException;
+
+
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class StagedDistributionFailureTest extends DistributionTestBase {
+
+ private static final String SUB1_AGENT = "subscriber-regular";
+ private static final String SUB2_AGENT = "subscriber-golden";
+
+
+ private static TestContainer publish;
+ private static TestContainer golden_publish;
+
+
+ private static final String TEST_PATH = "/content/mytest";
+
+
+ @BeforeOsgi
+ public static void beforeOsgi() throws Exception {
+ beforeOsgiBase();
+ publish = startPublishInstance(8182, SUB1_AGENT, false, SUB2_AGENT);
+
+ new Thread(() -> {
+ // Wait for at least one item in publish queue before starting golden publish
+ waitQueueItems(8182, SUB1_AGENT, 1);
+
+ LOG.info("Starting golden publish");
+ golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+ }).start();
+ }
+
+ @AfterOsgi
+ public static void afterOsgi() throws IOException {
+ if (publish != null) {
+ publish.stop();
+ }
+ if (golden_publish != null) {
+ golden_publish.stop();
+ }
+
+ afterOsgiBase();
+ }
+
+ @Before
+ public void before() {
+ createPath(TEST_PATH);
+
+ waitSubQueues(SUB1_AGENT);
+ }
+
+ @Test
+ public void testDistribute() {
+
+ distribute(TEST_PATH);
+
+ waitSubQueues(SUB1_AGENT, SUB2_AGENT);
+ waitEmptySubQueues();
+
+ waitPath(8182, TEST_PATH);
+ waitPath(8183, TEST_PATH);
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
new file mode 100644
index 0000000..1dba0e1
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import org.apache.sling.distribution.journal.it.DistributionTestBase;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+
+import java.io.IOException;
+
+
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class StagedDistributionTest extends DistributionTestBase {
+
+ private static final String SUB1_AGENT = "subscriber-regular";
+ private static final String SUB2_AGENT = "subscriber-golden";
+
+ private static TestContainer golden_publish;
+ private static TestContainer publish;
+
+
+ private static final String TEST_PATH = "/content/mytest";
+
+
+ @BeforeOsgi
+ public static void beforeOsgi() throws Exception {
+ beforeOsgiBase();
+ publish = startPublishInstance(8182, SUB1_AGENT, false, SUB2_AGENT);
+ golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+
+ }
+
+ @AfterOsgi
+ public static void afterOsgi() throws IOException {
+ if (publish != null) {
+ publish.stop();
+ }
+ if (golden_publish != null) {
+ golden_publish.stop();
+ }
+ afterOsgiBase();
+ }
+
+ @Before
+ public void before() {
+ createPath(TEST_PATH);
+
+ waitSubQueues(SUB1_AGENT, SUB2_AGENT);
+ }
+
+ @Test
+ public void testDistribute() {
+
+ distribute(TEST_PATH);
+
+ waitEmptySubQueues();
+
+ waitPath(8182, TEST_PATH);
+ waitPath(8183, TEST_PATH);
+
+ }
+}
diff --git a/src/test/resources/exam.properties b/src/test/resources/exam.properties
new file mode 100644
index 0000000..c70b3e5
--- /dev/null
+++ b/src/test/resources/exam.properties
@@ -0,0 +1,21 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Switch off default pax logging
+pax.exam.logging = none
+
+# Override default pax exam bundles to avoid clash of atinject
+pax.exam.system = default
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
new file mode 100644
index 0000000..5f21dd4
--- /dev/null
+++ b/src/test/resources/logback.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration>
+ <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date %level [%thread] %logger %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="info">
+ <appender-ref ref="console"/>
+ </root>
+
+ <logger name="kafka" level="WARN"/>
+ <logger name="org.apache.zookeeper" level="WARN"/>
+ <logger name="org.apache.kafka.clients" level="WARN"/>
+ <logger name="org.apache.sling.jcr.repoinit.impl" level="WARN"/>
+ <logger name="org.apache.jackrabbit.oak.security.internal" level="WARN"/>
+ <logger name="org.ops4j.pax.exam.cm.internal" level="WARN"/>
+ <logger name="org.apache.sling.settings.impl" level="WARN"/>
+ <logger name="org.apache.sling.engine.impl.parameters" level="WARN"/>
+</configuration>