You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/04/16 21:57:48 UTC

[sling-org-apache-sling-distribution-journal-it] 01/01: SLING-8346 - Import Journal based Sling Content Distribution source code

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git

commit c1b04f2835da43d33af0d1f7793c6f175ebd4d62
Author: tmaret <tm...@adobe.com>
AuthorDate: Tue Apr 16 23:57:13 2019 +0200

    SLING-8346 - Import Journal based Sling Content Distribution source code
    
    This project contains the integration tests of the `org.apache.sling.distribution.journal` modules
    
    Initial contributors
    
    *  Christian Schneider @cschneider
    *  Timothee Maret @tmaret
    *  Marius Petria @mpetria
---
 LICENSE                                            | 202 ++++++++++
 README.md                                          |  13 +
 pom.xml                                            | 253 +++++++++++++
 .../distribution/journal/it/ClusterIdCleaner.java  |  79 ++++
 .../journal/it/DistributionTestBase.java           | 277 ++++++++++++++
 .../journal/it/DistributionTestSupport.java        | 421 +++++++++++++++++++++
 .../sling/distribution/journal/it/FileUtil.java    |  54 +++
 .../distribution/journal/it/ext/AfterOsgi.java     |  30 ++
 .../distribution/journal/it/ext/BeforeOsgi.java    |  30 ++
 .../distribution/journal/it/ext/ExtPaxExam.java    |  93 +++++
 .../distribution/journal/it/kafka/KafkaLocal.java  |  91 +++++
 .../distribution/journal/it/kafka/KafkaRule.java   |  46 +++
 .../journal/it/kafka/PaxExamWithKafka.java         |  46 +++
 .../journal/it/kafka/ZooKeeperLocal.java           |  81 ++++
 .../journal/it/tests/Author2PublisherTest.java     | 195 ++++++++++
 .../journal/it/tests/AuthorDistributeTest.java     | 191 ++++++++++
 .../journal/it/tests/AuthorRestartTest.java        | 178 +++++++++
 .../journal/it/tests/ClearQueueItemTest.java       | 277 ++++++++++++++
 .../journal/it/tests/JournalAvailableTest.java     | 116 ++++++
 .../it/tests/LatePipeAuthorDistributeTest.java     | 152 ++++++++
 .../journal/it/tests/PublisherReceiveTest.java     | 175 +++++++++
 .../distribution/journal/it/tests/ScaleUpTest.java | 274 ++++++++++++++
 .../it/tests/StagedDistributionFailureTest.java    |  94 +++++
 .../journal/it/tests/StagedDistributionTest.java   |  86 +++++
 src/test/resources/exam.properties                 |  21 +
 src/test/resources/logback.xml                     |  39 ++
 26 files changed, 3514 insertions(+)

diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..898c182
--- /dev/null
+++ b/README.md
@@ -0,0 +1,13 @@
+org.apache.sling.distribution.journal.it
+==========================================
+
+Toxiproxy
+---------
+
+For some of the tests a local [toxiproxy installation](https://github.com/Shopify/toxiproxy#1-installing-toxiproxy) is needed.
+
+Unfortunately the docker version of toxiproxy does not work correctly on mac. It would need --net=host which does not work on mac as docker is run in a vm. So currently you need to install toxiproxy using brew manually and start it before the tests.
+
+If toxiproxy is not present the respective tests will be skipped with a message that toxiproxy server is missing.
+Because of a bug in pax exam the tests simply succeeed. So be aware that they might not check anything.
+
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..70dd3fa
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,253 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <!-- ======================================================================= -->
+    <!-- P A R E N T   P R O J E C T                                             -->
+    <!-- ======================================================================= -->
+    <parent>
+        <groupId>org.apache.sling</groupId>
+        <artifactId>sling</artifactId>
+        <version>34</version>
+        <relativePath />
+    </parent>
+    <artifactId>org.apache.sling.distribution.journal.it</artifactId>
+    <name>Apache Sling Distribution Journal - IT project</name>
+    <description>
+        Integration Tests project for the Apache Sling Distribution over journal
+    </description>
+    
+    <properties>
+        <org.ops4j.pax.exam.version>4.11.0</org.ops4j.pax.exam.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.servicemix.tooling</groupId>
+                <artifactId>depends-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-depends-file</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <!-- see IDEA-133397 -->
+                <version>2.21.0</version>
+                <configuration>
+                    <redirectTestOutputToFile>false</redirectTestOutputToFile>
+                    <!-- <argLine>-Xmx2048m</argLine> -->
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+  
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.11.0</version>
+        </dependency>
+        <dependency>
+            <groupId>eu.rekawek.toxiproxy</groupId>
+            <artifactId>toxiproxy-java</artifactId>
+            <version>2.1.3</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.hamcrest</artifactId>
+            <version>1.3_1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jsoup</groupId>
+            <artifactId>jsoup</artifactId>
+            <version>1.11.2</version>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.mime</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.serviceusermapper</artifactId>
+            <version>1.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.jcr.resource</artifactId>
+            <version>3.0.8</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.metrics</artifactId>
+            <version>1.2.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
+            <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.distribution.journal</artifactId>
+            <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.distribution.journal.kafka</artifactId>
+            <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.12</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-segment-tar</artifactId>
+            <version>1.8.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <scope>test</scope>
+            <version>3.2.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-store-spi</artifactId>
+            <version>1.8.2</version>
+        </dependency>
+        
+        <!-- Apache Felix -->
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.framework</artifactId>
+            <version>5.6.10</version>
+            <scope>provided</scope>
+        </dependency>
+                <!-- Sling Testing PaxExam -->
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.testing.paxexam</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        
+        <!-- Pax Exam -->
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-atinject_1.0_spec</artifactId>
+            <version>1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.tinybundles</groupId>
+            <artifactId>tinybundles</artifactId>
+            <version>3.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam</artifactId>
+            <version>${org.ops4j.pax.exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-cm</artifactId>
+            <version>${org.ops4j.pax.exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-container-forked</artifactId>
+            <version>${org.ops4j.pax.exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-junit4</artifactId>
+            <version>${org.ops4j.pax.exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-link-mvn</artifactId>
+            <version>${org.ops4j.pax.exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
+       <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>3.1.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.converter</artifactId>
+            <version>1.0.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.testing.osgi-mock</artifactId>
+            <version>2.3.4</version>
+            <scope>test</scope>
+        </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ClusterIdCleaner.java b/src/test/java/org/apache/sling/distribution/journal/it/ClusterIdCleaner.java
new file mode 100644
index 0000000..377a68f
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ClusterIdCleaner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+public class ClusterIdCleaner {
+    private NodeStore store;
+    
+    public ClusterIdCleaner(File nodeStoreBase) throws InvalidFileStoreVersionException, IOException {
+        FileStore fileStore = FileStoreBuilder.fileStoreBuilder(nodeStoreBase)
+                .withStrictVersionCheck(true)
+                .build();
+        this.store = SegmentNodeStoreBuilders.builder(fileStore).build();
+    }
+
+    public ClusterIdCleaner(NodeStore store) {
+        this.store = store;
+    }
+    
+    public void deleteClusterId() {
+        NodeBuilder builder = store.getRoot().builder();
+        NodeBuilder clusterConfigNode = builder.getChildNode(
+                ClusterRepositoryInfo.CLUSTER_CONFIG_NODE);
+        
+        if (!clusterConfigNode.exists()) {
+            // if it doesn't exist, then there is no way to delete
+            System.out.println("clusterId was never set or already deleted.");
+            return;
+        }
+        
+        if (!clusterConfigNode.hasProperty(ClusterRepositoryInfo.CLUSTER_ID_PROP)) {
+            // the config node exists, but the clusterId not 
+            // so again, no way to delete
+            System.out.println("clusterId was never set or already deleted.");
+            return;
+        }
+        String oldClusterId = clusterConfigNode.getProperty(ClusterRepositoryInfo.CLUSTER_ID_PROP)
+                .getValue(Type.STRING);
+        clusterConfigNode.removeProperty(ClusterRepositoryInfo.CLUSTER_ID_PROP);
+        try {
+            store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+            System.out.println("clusterId deleted successfully. (old id was " + oldClusterId + ")");
+        } catch (CommitFailedException e) {
+            System.err.println("Failed to delete clusterId due to exception: "+e.getMessage());
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
new file mode 100644
index 0000000..20da677
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+import org.awaitility.Duration;
+import org.hamcrest.Matcher;
+import org.junit.After;
+import org.junit.Before;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.util.Filter;
+import org.ops4j.pax.exam.util.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributionTestBase extends DistributionTestSupport {
+    protected static Logger LOG = LoggerFactory.getLogger(DistributionTestBase.class);
+
+    private static KafkaLocal kafka;
+
+    private static final String RESOURCE_TYPE = "sling:Folder";
+    private static final String PUB1_AGENT = "agent1";
+
+    @Inject
+    @Filter(value = "(name=agent1)", timeout = 40000L)
+    DistributionAgent agent;
+
+    @Inject
+    @Filter
+    Distributor distributor;
+
+    @Inject
+    ResourceResolverFactory resourceResolverFactory;
+
+    @Inject
+    MessagingProvider clientProvider;
+
+
+    @Configuration
+    public Option[] configuration() {
+        return new Option[] { //
+                //debug(),
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+                baseConfiguration(), //
+                defaultOsgiConfigs(), //
+                authorOsgiConfigs() //
+        };
+    }
+
+    public static void beforeOsgiBase() throws Exception {
+        kafka = new KafkaLocal();
+        DistributionTestSupport.createTopics();
+    }
+
+    public static void afterOsgiBase() {
+        IOUtils.closeQuietly(kafka);
+    }
+
+    @Before
+    public void beforeBase() {
+
+    }
+
+    @After
+    public void afterBase() {
+
+    }
+
+
+    public static TestContainer startPublishInstance(int httpPort, String agentName, boolean editable, String stageAgentName) {
+        ExamSystem testSystem;
+        try {
+            String workdir = String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), "publish-" + httpPort + "-" + UUID.randomUUID().toString());
+            Option[] config = CoreOptions.options( //
+                    new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
+                    defaultOsgiConfigs(), //
+                    publishOsgiConfigs(agentName, editable, stageAgentName), //
+                    CoreOptions.workingDirectory(workdir)
+            );
+
+            testSystem = PaxExamRuntime.createTestSystem(config);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+        TestContainer container = PaxExamRuntime.createContainer(testSystem);
+        container.start();
+        return container;
+    }
+
+
+    public void distribute(String path) {
+        try (ResourceResolver resolver = createResolver()) {
+            await().until(() -> tryDistribute(resolver, path), equalTo(true));
+        }
+    }
+
+    private boolean tryDistribute(ResourceResolver resolver, String path) {
+        DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, path);
+        DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+        LOG.info("Distribution for path {} ended with message: {} and status: {}", new Object[] { path,
+                response.getMessage(), response.isSuccessful() });
+        return response.isSuccessful();
+    }
+
+    private List<String> queueNames() {
+        List<String> queueNames = new ArrayList<>();
+        agent.getQueueNames().forEach(queueNames::add);
+        return queueNames;
+    }
+
+    private boolean allQueuesEmpty() {
+        return queueNames().stream().allMatch(this::queueEmpty);
+    }
+
+    private boolean queueEmpty(String queueName) {
+        return agent.getQueue(queueName).getStatus().getItemsCount() == 0;
+    }
+
+    @SuppressWarnings({ "deprecation" })
+    private ResourceResolver createResolver() {
+        try {
+            Map<String, Object> authinfo = new HashMap<String, Object>();
+            return resourceResolverFactory.getAdministrativeResourceResolver(authinfo);
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected void createPath(String path) {
+        try (ResourceResolver resolver = createResolver()){
+            ResourceUtil.getOrCreateResource(resolver, path, RESOURCE_TYPE, RESOURCE_TYPE, true);
+        } catch (Exception e) {
+            LOG.error("cannot create path", e);
+        }
+    }
+
+    private static int tryGetPath(int httpPort, String path) {
+        String url = String.format("http://localhost:%s%s.json", httpPort, path);
+        HttpGet httpGet = new HttpGet(url);
+        Header authHeader = null;
+        try (CloseableHttpClient client = HttpClients.createDefault()) {
+            authHeader = new BasicScheme().authenticate(new UsernamePasswordCredentials("admin", "admin"), httpGet, null);
+            httpGet.addHeader(authHeader);
+
+
+            CloseableHttpResponse response = client.execute(httpGet);
+            int status =  response.getStatusLine().getStatusCode();
+            LOG.info("try get path {} with status {}", url, status);
+            return status;
+
+        } catch (Exception e) {
+            LOG.error("cannot get path {}", url, e);
+        }
+        return  -1;
+    }
+
+    protected static void waitPath(int httpPort, String path) {
+        await().atMost(30, TimeUnit.SECONDS)
+            .until(() -> tryGetPath(httpPort, path), equalTo(200));
+    }
+
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public Iterable<String> waitSubQueues(String... queues) {
+        Matcher[] matchers = Stream.of(queues).map(q -> containsString(q)).toArray(Matcher[]::new);
+
+        await().atMost(Duration.FIVE_MINUTES)
+                .until(this::queueNames, containsInAnyOrder(matchers));
+
+        Iterable<String> queueNames = agent.getQueueNames();
+        LOG.info("Subscriber Queues: " + String.join(", ", queueNames));
+
+        return queueNames;
+    }
+
+    protected void waitEmptySubQueues() {
+        await().atMost(60, TimeUnit.SECONDS)
+                .until(this::allQueuesEmpty, equalTo(true));
+    }
+
+
+    static protected void waitQueueItems(int httpPort, String agentName, int count) {
+        await().atMost(Duration.FIVE_MINUTES)
+                .until(() -> tryGetQueueItems(httpPort, agentName), equalTo(count));
+        LOG.info("Items count {} for agent {}", count, agentName + "-" + httpPort);
+
+    }
+
+    static private int tryGetQueueItems(int httpPort, String agentName) {
+        String url = String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json", httpPort, agentName);
+        HttpGet httpGet = new HttpGet(url);
+        Header authHeader = null;
+        try (CloseableHttpClient client = HttpClients.createDefault()) {
+            authHeader = new BasicScheme().authenticate(new UsernamePasswordCredentials("admin", "admin"), httpGet, null);
+            httpGet.addHeader(authHeader);
+
+
+            CloseableHttpResponse response = client.execute(httpGet);
+            String text = IOUtils.toString(response.getEntity().getContent(), Charset.defaultCharset());
+            if (text == null) {
+                return -1;
+            }
+
+            String itemsCount = StringUtils.substringBetween(text, "itemsCount\":", ",");
+            if (itemsCount == null) {
+                return -1;
+            }
+
+            return Integer.parseInt(itemsCount.trim());
+        } catch (Throwable e) {
+            LOG.error("cannot get items count {}", url, e);
+        }
+        return  -1;
+    }
+
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
new file mode 100644
index 0000000..85ae96c
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+
+import static java.lang.Boolean.getBoolean;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.clients.admin.AdminClient.create;
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingDistribution;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingQuickstartOak;
+import static org.ops4j.pax.exam.Constants.START_LEVEL_SYSTEM_BUNDLES;
+import static org.ops4j.pax.exam.CoreOptions.bootDelegationPackage;
+import static org.ops4j.pax.exam.CoreOptions.bundle;
+import static org.ops4j.pax.exam.CoreOptions.composite;
+import static org.ops4j.pax.exam.CoreOptions.frameworkStartLevel;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+import static org.ops4j.pax.exam.CoreOptions.url;
+import static org.ops4j.pax.exam.CoreOptions.vmOption;
+import static org.ops4j.pax.exam.CoreOptions.when;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.io.File;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.kafka.KafkaClientProvider;
+import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
+import org.apache.sling.distribution.serialization.impl.vlt.VaultDistributionPackageBuilderFactory;
+import org.apache.sling.testing.paxexam.SlingOptions;
+import org.apache.sling.testing.paxexam.TestSupport;
+import org.ops4j.pax.exam.ConfigurationManager;
+import org.ops4j.pax.exam.Constants;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.options.CompositeOption;
+import org.ops4j.pax.exam.options.DefaultCompositeOption;
+import org.ops4j.pax.exam.options.libraries.JUnitBundlesOption;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+import com.google.common.collect.ImmutableMap;
+
+public class DistributionTestSupport extends TestSupport {
+    public static final String TOPIC_PACKAGE = "aemdistribution_package";
+    public static final String TOPIC_DISCOVERY = "aemdistribution_discovery";
+    public static final String TOPIC_COMMAND = "aemdistribution_command";
+    public static final String TOPIC_STATUS = "aemdistribution_status";
+    public static final String TOPIC_EVENT = "aemdistribution_event";
+    
+    @Inject
+    protected BundleContext bundleContext;
+
+    @Inject
+    protected ConfigurationAdmin configAdmin;
+
+    private int httpPort = 8181;
+
+    public DistributionTestSupport withHttpPort(int httpPort) {
+        this.httpPort = httpPort;
+        return this;
+    }
+    
+    public Option baseConfiguration() {
+        String workingDirectory = workingDirectory();
+        FileUtil.deleteDir(new File(workingDirectory));
+        return baseConfiguration(workingDirectory);
+    }
+
+    public Option baseConfiguration(String baseDirectory) {
+        // Patch versions of features provided by SlingOptions
+        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.commons.mime");
+        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.commons.metrics");
+        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.core");
+        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal");
+        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal.messages");
+        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal.kafka");
+        SlingOptions.versionResolver.setVersionFromProject("io.dropwizard.metrics", "metrics-core");
+        SlingOptions.versionResolver.setVersion("org.slf4j", "log4j-over-slf4j", "1.7.6");
+
+        Option baseOptions = composite(
+                super.baseConfiguration(),
+                SlingOptions.logback(),
+                mavenBundle().groupId("org.slf4j").artifactId("log4j-over-slf4j").version(SlingOptions.versionResolver),
+                
+                // The base sling Quickstart
+                slingQuickstart(baseDirectory),
+                mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.webconsole.plugins.ds").version(SlingOptions.versionResolver),
+                mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.metrics").version(SlingOptions.versionResolver),
+
+                mvn("com.google.protobuf", "protobuf-java"),
+                kafka(),
+
+                // The bundle built (org.apache.sling.distribution.journal)
+                mvn("org.apache.sling", "org.apache.sling.distribution.journal"),
+                mvn("org.apache.sling", "org.apache.sling.distribution.journal.messages"),
+                mvn("org.apache.sling", "org.apache.sling.distribution.journal.kafka"),
+
+                // distribution bundles
+                slingDistribution(),
+
+                // testing
+                //slingResourcePresence(), // see https://github.com/apache/sling-org-apache-sling-resource-presence
+                jsoup(),
+                myJunitBundles()
+
+        );
+
+        // Remote debugger on the forked JVM
+        // Run with mvn install -DisDebugEnabled=true
+        return getBoolean("isDebugEnabled")
+                ? composite(remoteDebug(), baseOptions)
+                : baseOptions;
+    }
+
+    public static CompositeOption myJunitBundles() {
+        return new DefaultCompositeOption(
+                composite(defaultTestSystemOptions()),
+                new JUnitBundlesOption(), 
+                systemProperty("pax.exam.invoker").value("junit"),
+                mvn("org.mockito", "mockito-all"),
+                mvn("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.hamcrest"),
+                mvn("org.awaitility", "awaitility"),
+                bundle("link:classpath:META-INF/links/org.ops4j.pax.exam.invoker.junit.link"));
+    }
+    
+    /**
+     * Dependencies for mockito 2
+     */
+    public static Option mockito2() {
+        return composite(
+                mvn("org.objenesis", "objenesis"),
+                mvn("net.bytebuddy", "byte-buddy"),
+                mvn("net.bytebuddy", "byte-buddy-agent"),
+                mvn("org.mockito", "mockito-core")
+                );
+    }
+    
+    /**
+     * Standard test system options with just at inject removed as it collides with
+     * the one provided in sling 
+     */
+    private static Option[] defaultTestSystemOptions() {
+        ConfigurationManager cm = new ConfigurationManager();
+        String logging = cm.getProperty(Constants.EXAM_LOGGING_KEY,
+            Constants.EXAM_LOGGING_PAX_LOGGING);
+
+        return new Option[] {
+            bootDelegationPackage("sun.*"),
+            frameworkStartLevel(Constants.START_LEVEL_TEST_BUNDLE),
+            url("link:classpath:META-INF/links/org.ops4j.pax.exam.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.ops4j.pax.exam.inject.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.ops4j.pax.extender.service.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.osgi.compendium.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+
+            when(logging.equals(Constants.EXAM_LOGGING_PAX_LOGGING)).useOptions(
+                url("link:classpath:META-INF/links/org.ops4j.pax.logging.api.link").startLevel(
+                    START_LEVEL_SYSTEM_BUNDLES)),
+
+            url("link:classpath:META-INF/links/org.ops4j.base.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.core.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.extender.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.framework.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.lifecycle.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.tracker.link").startLevel(
+                START_LEVEL_SYSTEM_BUNDLES),
+            /*
+            url("link:classpath:META-INF/links/org.apache.geronimo.specs.atinject.link")
+                .startLevel(START_LEVEL_SYSTEM_BUNDLES) };
+                */
+        };
+    }
+    
+    protected Option slingQuickstart(String baseDirectory) {
+        final String workingDirectory = String.format("%s/instance", baseDirectory);
+        final String dataStoreDirectory = String.format("%s/shareddatastore", baseDirectory);
+        System.out.println(String.format("Quickstart with workingDirectory %s, port %s", workingDirectory, httpPort));
+        return slingQuickstartOakSharedBlobstore(workingDirectory, dataStoreDirectory, httpPort);
+    }
+    
+    public static Option defaultOsgiConfigs() {
+        return defaultOsgiConfigs("");
+    }
+
+    /**
+     * OSGI configurations targeted to author and publish instances
+     */
+    protected static Option defaultOsgiConfigs(String journalEndpoint) {
+        return composite(
+                newConfiguration("org.apache.sling.jcr.resource.internal.JcrSystemUserValidator")
+                    .put("allow.only.system.user", false).asOption(),
+                    
+                // For production the users would be: replication-service,content-writer-service
+                factoryConfiguration("org.apache.sling.serviceusermapping.impl.ServiceUserMapperImpl.amended")
+                        .put("user.mapping", new String[]{"org.apache.sling.distribution.journal:bookkeeper=admin","org.apache.sling.distribution.journal:importer=admin"})
+                        .asOption(),
+
+                factoryConfiguration(VaultDistributionPackageBuilderFactory.class.getName())
+                        .put("name", "journal")
+                        .put("type", "inmemory")
+                        .put("useBinaryReferences", "true")
+                        .put("aclHandling", "IGNORE")
+                        .put("package.filters", new String[]{"/home/users|-.*/.tokens", "/home/users|-.*/rep:cache"})
+                        .put("property.filters", new String[]{"/|-^.*/cq:lastReplicated|-^.*/cq:lastReplicatedBy|-^.*/cq:lastReplicationAction"})
+                        .asOption(),
+
+                newConfiguration("org.apache.sling.distribution.journal.kafka.KafkaClientProvider")
+                    .asOption(),
+
+                newConfiguration("org.apache.sling.distribution.component.impl.DistributionComponentFactoryMap")
+                        .put("mapping.agent", new String[]{//
+                                "pub:org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"})
+                        .asOption(),
+
+                newConfiguration("org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker")
+                        .put("scheduler.period", 1L)
+                        .asOption()
+
+        );
+
+    }
+
+    /**
+     * OSGI configurations targeted to the author instances only
+     */
+    public static Option authorOsgiConfigs() {
+        return composite(
+                factoryConfiguration("org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory")
+                        .put("name", "agent1")
+                        .put("packageBuilder.target", "(name=journal)")
+                        .asOption(),
+                factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+                    .put("kind", "agent")
+                    .put("provider.roots", "/libs/sling/distribution/services/agents")
+                    .asOption(),
+                 factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+                    .put("kind", "exporter")
+                    .put("provider.roots", "/libs/sling/distribution/services/exporters")
+                    .asOption(),
+                 factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+                    .put("kind", "importer")
+                    .put("provider.roots", "/libs/sling/distribution/services/importers")
+                    .asOption()
+        );
+
+    }
+    
+    protected static Option publishOsgiConfigs() {
+        return publishOsgiConfigs("subscriber-agent1");
+    }
+
+    /**
+     * OSGI configuration targeted to the publish instances only
+     */
+    public static Option publishOsgiConfigs(String agentName) {
+        return publishOsgiConfigs(agentName, true, null);
+
+    }
+
+    protected static Option publishOsgiConfigs(String agentName, boolean editable, String stage) {
+
+
+        Option subConfig = composite(
+                factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
+                        .put("kind", "agent")
+                        .put("provider.roots", "/libs/sling/distribution/services/agents")
+                        .asOption(),
+
+                factoryConfiguration("org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
+                        .put("name", agentName)
+                        .put("agentNames", new String[]{"agent1"})
+                        .put("packageBuilder.target", "(name=journal)")
+                        .put("precondition.target",  stage != null ? "(name=staging)" : "(name=default)")
+                        .put("editable", editable)
+                        .put("announceDelay", "500")
+                        .asOption());
+
+        Option condConfig =  newConfiguration("org.apache.sling.distribution.journal.impl.subscriber.StagingPrecondition")
+                .put("subAgentName", stage)
+                .asOption();
+
+        return stage != null ? composite(subConfig, condConfig) : subConfig;
+
+    }
+
+
+    protected static Option remoteDebug() {
+        return remoteDebug(5005);
+    }
+
+    protected static Option remoteDebug(int debugPort) {
+        System.out.println(String.format("Remote debugger on port: %s", debugPort));
+        return vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005");
+    }
+
+    private static Option slingQuickstartOakSharedBlobstore(String workingDirectory, String dataStoreDirectory, int httpPort) {
+
+        // Option for a quickstart running an Oak repository with a nodestore and a shared data store.
+
+        String slingHome = String.format("%s/sling", workingDirectory);
+        String repositoryHome = String.format("%s/repository", slingHome);
+        String localIndexDir = String.format("%s/index", repositoryHome);
+        return composite(
+
+                slingQuickstartOak(),
+
+                mavenBundle()
+                        .groupId("org.apache.jackrabbit")
+                        .artifactId("oak-lucene")
+                        .version(SlingOptions.versionResolver),
+
+                mavenBundle()
+                        .groupId("org.apache.jackrabbit")
+                        .artifactId("oak-segment-tar")
+                        .version(SlingOptions.versionResolver),
+
+                mavenBundle()
+                        .groupId("org.apache.sling")
+                        .artifactId("org.apache.sling.jcr.oak.server")
+                        .version(SlingOptions.versionResolver),
+
+                newConfiguration("org.apache.felix.http")
+                        .put("org.osgi.service.http.port", httpPort).asOption(),
+
+                newConfiguration("org.apache.jackrabbit.oak.segment.SegmentNodeStoreService")
+                        .put("customBlobStore", true)
+                        .put("repository.home", repositoryHome)
+                        .put("name", "NodeStore with custom blob store").asOption(),
+
+                newConfiguration("org.apache.jackrabbit.oak.plugins.blob.datastore.FileDataStore")
+                        .put("path", dataStoreDirectory)
+                        .put("minRecordLength", 16384).asOption(),
+
+                newConfiguration("org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexProviderService")
+                        .put("localIndexDir", localIndexDir).asOption()
+        );
+    }
+    
+    private static Option kafka() {
+        return composite(
+                mvn("com.fasterxml.jackson.core", "jackson-core"),
+                mvn("com.fasterxml.jackson.core", "jackson-annotations"),
+                mvn("com.fasterxml.jackson.core", "jackson-databind"),
+                mvn("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.kafka-clients")
+        );
+    }
+    
+    public static Option mvn(String groupId, String artifactId) {
+        return mavenBundle().groupId(groupId).artifactId(artifactId).versionAsInProject();
+    }
+
+    private static Option jsoup() {
+        return mavenBundle().groupId("org.jsoup").artifactId("jsoup").versionAsInProject();
+    }
+
+    public static void createTopic(String topicName) {
+        NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
+        try (AdminClient admin = create(singletonMap(BOOTSTRAP_SERVERS_CONFIG,
+                standardConverter().convert(kafkaProperties()).to(KafkaEndpoint.class).kafkaBootstrapServers()))) {
+            CreateTopicsResult result = admin.createTopics(singletonList(newTopic));
+            result.values().get(topicName).get();
+        } catch (Exception e) {
+            throw new RuntimeException(format("Failed to create topic %s", topicName), e);
+        }
+    }
+
+    public static void createTopics() {
+        createTopic(TOPIC_DISCOVERY);
+        createTopic(TOPIC_PACKAGE);
+        createTopic(TOPIC_COMMAND);
+        createTopic(TOPIC_STATUS);
+        createTopic(TOPIC_EVENT);
+    }
+
+    public static MessagingProvider createProvider() {
+        KafkaClientProvider provider = new KafkaClientProvider();
+        provider.activate(standardConverter().convert(kafkaProperties()).to(KafkaEndpoint.class));
+        return provider;
+    }
+
+    private static Map<String,String> kafkaProperties() {
+         return ImmutableMap.of(
+                "kafkaDefaultApiTimeout", "5000",
+                "kafkaConnectTimeout", "32000");
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/FileUtil.java b/src/test/java/org/apache/sling/distribution/journal/it/FileUtil.java
new file mode 100644
index 0000000..a40b327
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/FileUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.apache.commons.io.FileUtils;
+
+public class FileUtil {
+    private FileUtil() {
+    }
+
+    public static void copyFolder(Path src, Path dest) throws IOException{
+        Files.walk(src).forEach(source -> copy(source, dest.resolve(src.relativize(source))));
+    }
+
+    private static void copy(Path source, Path dest) {
+        try {
+            Files.copy(source, dest, REPLACE_EXISTING);
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+    
+
+    public static void deleteDir(File dir) {
+        try {
+            FileUtils.deleteDirectory(dir);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ext/AfterOsgi.java b/src/test/java/org/apache/sling/distribution/journal/it/ext/AfterOsgi.java
new file mode 100644
index 0000000..da980b4
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ext/AfterOsgi.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.ext;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface AfterOsgi {
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ext/BeforeOsgi.java b/src/test/java/org/apache/sling/distribution/journal/it/ext/BeforeOsgi.java
new file mode 100644
index 0000000..c504cd2
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ext/BeforeOsgi.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.ext;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface BeforeOsgi {
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/ext/ExtPaxExam.java b/src/test/java/org/apache/sling/distribution/journal/it/ext/ExtPaxExam.java
new file mode 100644
index 0000000..33603d3
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/ext/ExtPaxExam.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.ext;
+
+import static java.util.Arrays.asList;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Optional;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.model.InitializationError;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+/**
+ * Shows how to extend the PaxExam runner to implement hooks that are executed outside of OSGi in the parent
+ * junit process 
+ */
+public class ExtPaxExam extends PaxExam {
+
+    private Optional<Method> before;
+    private Optional<Method> after;
+
+    public ExtPaxExam(Class<?> klass) throws InitializationError {
+        super(klass);
+        this.before = getStaticMethodWith(klass, BeforeOsgi.class);
+        this.after = getStaticMethodWith(klass, AfterOsgi.class);
+    }
+
+    @Override
+    public void run(RunNotifier notifier) {
+        /**
+         *  If a test is run more than once from the ide the by default the pax exam dir is not deleted.
+         *  This causes bundles to be installed multiple times. Deleting the pax exam dir to avoid this.
+         */
+        deleteDir(new File("target/paxexam"));
+        this.before.ifPresent(this::invoke);
+        try {
+            super.run(notifier);
+        } finally {
+            this.after.ifPresent(this::invoke);
+        }
+    }
+    
+    private void deleteDir(File dir) {
+        try {
+            FileUtils.deleteDirectory(dir);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+    
+    private Optional<Method> getStaticMethodWith(Class<?> klass, Class<? extends Annotation> annotation) {
+        Optional<Method> foundMethod = asList(klass.getMethods()).stream()
+            .filter(method -> method.getAnnotation(annotation) != null)
+            .findFirst();
+        if (foundMethod.isPresent()) {
+            Method m = foundMethod.get();
+            if (!Modifier.isStatic(m.getModifiers())) {
+                throw new IllegalStateException("Method " + m.getName() + " must be static to be used as " + annotation.getName());
+            }
+        }
+        return foundMethod;
+    }
+
+    private void invoke(Method method) {
+        try {
+            method.invoke(null);
+        } catch (Exception e) {
+            throw new RuntimeException("Error calling method " + method, e);
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaLocal.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaLocal.java
new file mode 100644
index 0000000..658f075
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaLocal.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.VerifiableProperties;
+import scala.Some;
+import scala.collection.Seq;
+
+public class KafkaLocal implements Closeable {
+    Logger LOG = LoggerFactory.getLogger(KafkaLocal.class);
+    
+    public KafkaServer kafka;
+    public ZooKeeperLocal zookeeper;
+    
+    public KafkaLocal() throws Exception {
+        this(kafkaProperties(), zookeeperProperties());
+    }
+
+    public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws Exception {
+        zookeeper = new ZooKeeperLocal(zkProperties);
+        KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+        Seq<KafkaMetricsReporter> reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(kafkaProperties));
+        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+        kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, new Some<String>("kafka"), reporters);
+        kafka.startup();
+    }
+
+    @Override
+    public void close() throws IOException {
+        System.out.println("stopping kafka...");
+        try {
+            kafka.shutdown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        zookeeper.close();
+        System.out.println("done");
+    }
+
+    private static Properties kafkaProperties() {
+        String logDir = "target/kafka-" + UUID.randomUUID().toString();
+        Properties kafkaProps = new Properties();
+        kafkaProps.put("zookeeper.connect", "localhost:2181");
+        kafkaProps.put("advertised.host.name", "localhost");
+        kafkaProps.put("host.name","localhost");
+        kafkaProps.put("port", "9092");
+        kafkaProps.put("auto.create.topics.enable", false);
+        kafkaProps.put("broker.id", "0");
+        kafkaProps.put("log.dir",logDir);
+        kafkaProps.put("group.initial.rebalance.delay.ms", "0");
+        kafkaProps.put("group.min.session.timeout.ms", "1000");
+        return kafkaProps;
+    }
+
+    private static Properties zookeeperProperties() {
+        Properties zkProps = new Properties();
+        UUID uuid = UUID.randomUUID();
+        zkProps.put("dataDir", "target/zookeeper/"+uuid.toString());
+        zkProps.put("clientPort", "2181");
+        return zkProps;
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaRule.java
new file mode 100644
index 0000000..afc1291
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/KafkaRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import java.io.IOException;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+public class KafkaRule implements TestRule {
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+        return new Statement() {
+
+            @Override
+            public void evaluate() throws Throwable {
+                runWithKafka(base);
+            }
+        };
+    }
+
+    private void runWithKafka(Statement base) throws Throwable, IOException, Exception {
+        try (KafkaLocal kafka = new KafkaLocal()) {
+            base.evaluate();
+        }
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/PaxExamWithKafka.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/PaxExamWithKafka.java
new file mode 100644
index 0000000..40c6365
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/PaxExamWithKafka.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.model.InitializationError;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaxExamWithKafka extends PaxExam {
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    public PaxExamWithKafka(Class<?> klass) throws InitializationError {
+        super(klass);
+    }
+
+    @Override
+    public void run(RunNotifier notifier) {
+        try (KafkaLocal kafka = new KafkaLocal()){
+            DistributionTestSupport.createTopics();
+            super.run(notifier);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/kafka/ZooKeeperLocal.java b/src/test/java/org/apache/sling/distribution/journal/it/kafka/ZooKeeperLocal.java
new file mode 100644
index 0000000..98a9cf4
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/kafka/ZooKeeperLocal.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.kafka;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperLocal implements Closeable {
+    static Logger LOG = LoggerFactory.getLogger(ZooKeeperLocal.class);
+    MyZooKeeperServerMain zooKeeperServer;
+
+    public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException, ConfigException {
+        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+        quorumConfiguration.parseProperties(zkProperties);
+        zooKeeperServer = new MyZooKeeperServerMain(quorumConfiguration);
+
+        new Thread() {
+            public void run() {
+                try {
+                    zooKeeperServer.startup();
+                } catch (IOException e) {
+                    System.out.println("ZooKeeper Failed");
+                    e.printStackTrace(System.err);
+                }
+            }
+        }.start();
+    }
+
+    @Override
+    public void close() throws IOException {
+        zooKeeperServer.shutdown();
+    }
+    
+    static class MyZooKeeperServerMain extends ZooKeeperServerMain {
+
+        private QuorumPeerConfig config;
+
+        MyZooKeeperServerMain(QuorumPeerConfig config) {
+            this.config = config;
+        }
+
+        public void startup() throws IOException {
+            ServerConfig serverConfig = new ServerConfig();
+            serverConfig.readFrom(config);
+            runFromConfig(serverConfig);
+        }
+
+        public void shutdown() {
+            try {
+                super.shutdown();
+            } catch (Exception e) {
+                LOG.error("Error shutting down ZooKeeper", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/Author2PublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/Author2PublisherTest.java
new file mode 100644
index 0000000..7dc5f95
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/Author2PublisherTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.ops4j.pax.exam.util.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+
+/**
+ * Starts one author instance and two publisher instances.
+ * The author instance also runs the test code.
+ * The test triggers a content distribution and checks it is correctly processed by
+ * both publish instances.
+ */
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class Author2PublisherTest extends DistributionTestSupport {
+    private static final String PUB1_AGENT = "agent1";
+    private static final String SUB1_AGENT = "subscriber-agent1";
+    private static final String SUB2_AGENT = "subscriber-agent2";
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Inject
+    @Filter(value = "(name=agent1)", timeout = 40000L)
+    DistributionAgent agent;
+    
+    @Inject
+    @Filter
+    Distributor distributor;
+
+    @Inject
+    @Filter
+    ResourceResolverFactory resourceResolverFactory;
+    
+    @Inject
+    MessagingProvider clientProvider;
+    
+    private static TestContainer publisher;
+
+    private static TestContainer publisher2;
+    private static KafkaLocal kafka;
+    
+    @Configuration
+    public Option[] configuration() {
+        return new Option[] { //
+                //debug(),
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+                baseConfiguration(), //
+                defaultOsgiConfigs(), //
+                authorOsgiConfigs() //
+        };
+    }
+
+    public Option debug() {
+        return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    }
+    
+    @BeforeOsgi
+    public static void startPublishers() throws Exception {
+        kafka = new KafkaLocal();
+        DistributionTestSupport.createTopics();
+        publisher = startPublisher(publisherConfig(8182,  "Author2PublisherTest.publisher1", SUB1_AGENT));
+        publisher2 = startPublisher(publisherConfig(8183, "Author2PublisherTest.publisher2", SUB2_AGENT));
+    }
+
+    private static TestContainer startPublisher(Option[] config) {
+        ExamSystem testSystem;
+        try {
+            testSystem = PaxExamRuntime.createTestSystem(config);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+        TestContainer container = PaxExamRuntime.createContainer(testSystem);
+        container.start();
+        //new Thread(container::start).start();
+        return container;
+    }
+    
+    @AfterOsgi
+    public static void stopPublishers() throws IOException {
+        if (publisher != null) {
+            publisher.stop();
+        }
+        if (publisher2 != null) {
+            publisher2.stop();
+        }
+        closeQuietly(kafka);
+    }
+    
+    private static Option[] publisherConfig(int httpPort, String instanceName, String agentName) {
+        String workdir = String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), instanceName);
+        return CoreOptions.options( //
+                new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
+                defaultOsgiConfigs(), //
+                publishOsgiConfigs(agentName), //
+                CoreOptions.workingDirectory(workdir)
+        );
+    }
+
+    @SuppressWarnings({ "deprecation", "unchecked" })
+    @Test
+    public void testDistribute() throws Exception {
+        Map<String, Object> authinfo = new HashMap<String, Object>();
+        try (ResourceResolver resolver = resourceResolverFactory.getAdministrativeResourceResolver(authinfo)) {
+            await().until(() -> distribute(resolver), equalTo(true));
+        }
+        await()
+            .atMost(30, TimeUnit.SECONDS)
+            .until(this::queueNames, containsInAnyOrder(containsString(SUB1_AGENT),  containsString(SUB2_AGENT)));
+        await()
+            .atMost(30, TimeUnit.SECONDS)
+            .until(this::allQueuesEmpty, equalTo(true));
+        System.out.println("Queuenames " + agent.getQueueNames());
+    }
+    
+    private boolean distribute(ResourceResolver resolver) {
+        DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
+        DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+        log.info(response.getMessage());
+        return response.isSuccessful();
+    }
+
+    private List<String> queueNames() {
+        List<String> queueNames = new ArrayList<>();
+        agent.getQueueNames().forEach(queueNames::add);
+        return queueNames;
+    }
+    
+    private boolean allQueuesEmpty() {
+        return queueNames().stream().allMatch(this::queueEmpty);
+    }
+    
+    private boolean queueEmpty(String queueName) {
+        return agent.getQueue(queueName).getStatus().getItemsCount() == 0;
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
new file mode 100644
index 0000000..c45b199
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Starts an author instance, triggers a content distribution and checks that the package arrives
+ * on the journal.
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class AuthorDistributeTest extends DistributionTestSupport {
+    private static final String PUB1_AGENT = "agent1";
+    private static final String SUB1_SLING_ID = UUID.randomUUID().toString();
+    private static final String SUB1_AGENT = "sub1agent";
+    private static final String QUEUE_NAME = SUB1_SLING_ID + "-" + SUB1_AGENT;
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Inject
+    @Filter(value = "(name=agent1)", timeout = 40000L)
+    DistributionAgent agent;
+    
+    @Inject
+    @Filter
+    Distributor distributor;
+
+    @Inject
+    @Filter
+    ResourceResolverFactory resourceResolverFactory;
+    
+    @Inject
+    MessagingProvider clientProvider;
+    
+    private AtomicReference<PackageMessage> recordedPackage = new AtomicReference<PackageMessage>();
+
+    private Semaphore messageSem = new Semaphore(0);
+    
+    @Configuration
+    public Option[] configuration() {
+        return new Option[] { //
+                //debug(),
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+                baseConfiguration(), //
+                defaultOsgiConfigs(), //
+                authorOsgiConfigs() //
+        };
+    }
+
+    @Test
+    public void testDistribute() throws Exception {
+        await().until(this::distribute);
+        
+        try (Closeable packagePoller = createPoller()) {
+            assertPackageReceived();
+        }
+        
+        simulateDiscoveryMessage();
+        await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.singleton(QUEUE_NAME)));
+        assertThat(agent.getQueue(QUEUE_NAME).getStatus().getItemsCount(), equalTo(1));
+    }
+
+    @SuppressWarnings("deprecation")
+    private boolean distribute() throws LoginException {
+        Map<String, Object> authinfo = new HashMap<String, Object>();
+        try (ResourceResolver resolver = resourceResolverFactory.getAdministrativeResourceResolver(authinfo)) {
+            DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
+            DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+            log.info(response.getMessage());
+            return response.isSuccessful();
+        }
+    }
+
+    private Closeable createPoller() {
+        HandlerAdapter<PackageMessage> adapter = create(PackageMessage.class, this::handle);
+        return clientProvider.createPoller(TOPIC_PACKAGE, Reset.earliest, adapter);
+    }
+    
+    private void assertPackageReceived() throws InterruptedException {
+        assertTrue(messageSem.tryAcquire(10, TimeUnit.SECONDS));
+        PackageMessage pkg = recordedPackage.get();
+        assertEquals(PackageMessage.ReqType.ADD, pkg.getReqType());
+        String path = pkg.getPathsList().iterator().next();
+        assertEquals("/", path);
+    }
+
+    private void simulateDiscoveryMessage() throws InterruptedException {
+        assertEquals(Collections.emptySet(), toSet(agent.getQueueNames()));
+        MessageSender<DiscoveryMessage> discSender = clientProvider.createSender();
+        DiscoveryMessage disc = createDiscoveryMessage(0);
+        discSender.send(TOPIC_DISCOVERY, disc);
+    }
+
+    private DiscoveryMessage createDiscoveryMessage(long offset) {
+        SubscriberState subState = SubscriberState.newBuilder()
+                .setOffset(offset)
+                .setPubAgentName(PUB1_AGENT)
+                .build();
+        return DiscoveryMessage.newBuilder()
+                .setSubSlingId(SUB1_SLING_ID)
+                .setSubAgentName(SUB1_AGENT)
+                .setSubscriberConfiguration(SubscriberConfiguration
+                        .newBuilder()
+                        .setEditable(false)
+                        .setMaxRetries(-1)
+                        .build())
+                .addSubscriberState(subState)
+                .build();
+    }
+
+    private static <T> Set<T> toSet(final Iterable<T> iterable) {
+        return StreamSupport.stream(iterable.spliterator(), false)
+                            .collect(Collectors.toSet());
+    }
+    
+    void handle(MessageInfo info, PackageMessage message) {
+        if (message.getReqType() == ReqType.ADD) {
+            recordedPackage.set(message);
+            messageSem.release();
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
new file mode 100644
index 0000000..f0911d7
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Starts an author instance, triggers a content distribution and checks that the package arrives
+ * on the journal.
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class AuthorRestartTest extends DistributionTestSupport {
+    private static final String PUB1_AGENT = "agent1";
+    private static final String SUB1_SLING_ID = UUID.randomUUID().toString();
+    private static final String SUB1_AGENT = "sub1agent";
+    private static final String QUEUE_NAME = SUB1_SLING_ID + "-" + SUB1_AGENT;
+    private static final int NUM_MESSAGES = 2000;
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Inject
+    @Filter(value = "(name=agent1)", timeout = 40000L)
+    private DistributionAgent agent;
+    
+    @Inject
+    @Filter
+    private ResourceResolverFactory resourceResolverFactory;
+    
+    @Inject
+    private MessagingProvider clientProvider;
+    
+    private AtomicReference<PackageMessage> recordedPackage = new AtomicReference<PackageMessage>();
+    private Semaphore messageSem = new Semaphore(0);
+    
+    @Configuration
+    public Option[] configuration() {
+        return new Option[] { //
+                //debug(),
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+                baseConfiguration(), //
+                defaultOsgiConfigs(), //
+                authorOsgiConfigs() //
+        };
+    }
+
+    @Test
+    public void testRestartWithExisingMessages() throws Exception {
+        for (int c = 0; c < NUM_MESSAGES; c++) {
+            if (c % 100 == 0) {
+                log.info("Sending message {}", c);
+            }
+            PackageMessage packageMessage = createPackageMessage(c);
+            clientProvider.createSender().send(TOPIC_PACKAGE, packageMessage);
+        }
+        try (Closeable packagePoller = createPoller()) {
+            messageSem.tryAcquire(NUM_MESSAGES, 100, TimeUnit.SECONDS);
+        }
+        await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.emptySet()));
+        DiscoveryMessage disc = createDiscoveryMessage(0);
+        clientProvider.createSender().send(TOPIC_DISCOVERY, disc);
+        await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.singleton(QUEUE_NAME)));
+        
+        log.info("Checking Items in queue");
+        for (int c=0; c<20; c++) {
+            int itemsCount = agent.getQueue(QUEUE_NAME).getStatus().getItemsCount();
+            log.info("Items in queue: {}", itemsCount);
+            assertThat(itemsCount, equalTo(NUM_MESSAGES));
+            Thread.sleep(100);
+        }
+    }
+
+    private Closeable createPoller() {
+        HandlerAdapter<PackageMessage> adapter = create(PackageMessage.class, this::handle);
+        return clientProvider.createPoller(TOPIC_PACKAGE, Reset.earliest, adapter);
+    }
+
+    private DiscoveryMessage createDiscoveryMessage(long offset) {
+        SubscriberState subState = SubscriberState.newBuilder()
+                .setOffset(offset)
+                .setPubAgentName(PUB1_AGENT)
+                .build();
+        return DiscoveryMessage.newBuilder()
+                .setSubSlingId(SUB1_SLING_ID)
+                .setSubAgentName(SUB1_AGENT)
+                .setSubscriberConfiguration(SubscriberConfiguration
+                        .newBuilder()
+                        .setEditable(false)
+                        .setMaxRetries(-1)
+                        .build())
+                .addSubscriberState(subState)
+                .build();
+    }
+
+    private <T> Set<T> toSet(final Iterable<T> iterable) {
+        return StreamSupport.stream(iterable.spliterator(), false)
+                            .collect(Collectors.toSet());
+    }
+    
+    private PackageMessage createPackageMessage(int num) throws IOException {
+        return PackageMessage.newBuilder()
+                .setPkgId("myid" + num)
+                .setPubSlingId("pub1sling")
+                .setPubAgentName(PUB1_AGENT)
+                .setPkgType("journal")
+                .setReqType(PackageMessage.ReqType.ADD)
+                .addAllPaths(Arrays.asList("/test"))
+                .setPkgBinary(ByteString.copyFrom(new byte[100]))
+                .build();
+    }
+    
+    private void handle(MessageInfo info, PackageMessage message) {
+        if (message.getReqType() == ReqType.ADD) {
+            recordedPackage.set(message);
+            messageSem.release();
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
new file mode 100644
index 0000000..a918caa
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+
+import com.google.protobuf.ByteString;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.ops4j.pax.exam.util.PathUtils;
+
+import static org.apache.sling.distribution.journal.messages.Messages.*;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+/**
+ * Starts one author instance and one publisher instances.
+ * The author instance also runs the test code.
+ * The test sends 10 invalid distribution package that fails importing on the publisher.
+ * The test sends a clear command for the first 1 entry and checks that the
+ * package was removed from the queue.
+ * The test sends a clear command for all entries and checks that the queue is cleared.
+ *
+ * The test also covers the remove interface while the REMOVABLE capability is supported.
+ */
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class ClearQueueItemTest extends DistributionTestSupport {
+
+    private static final String PUB1_AGENT = "agent1";
+    private static final String SUB1_AGENT = "subscriber-agent1";
+
+    @Inject
+    @Filter(value = "(name=agent1)", timeout = 40000L)
+    DistributionAgent agent;
+
+    @Inject
+    MessagingProvider clientProvider;
+
+    private static TestContainer publisher;
+    private static KafkaLocal kafka;
+
+    @Configuration
+    public Option[] configuration() {
+        return new Option[] { //
+                //debug(),
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+                baseConfiguration(), //
+                defaultOsgiConfigs(), //
+                authorOsgiConfigs() //
+        };
+    }
+
+    public Option debug() {
+        return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    }
+
+    @BeforeOsgi
+    public static void startPublisher() throws Exception {
+        kafka = new KafkaLocal();
+        DistributionTestSupport.createTopics();
+        publisher = startPublisher(publisherConfig(8182,  "RemoveQueueItemTest.publisher1", SUB1_AGENT));
+    }
+
+    private static TestContainer startPublisher(Option[] config) {
+        ExamSystem testSystem;
+        try {
+            testSystem = PaxExamRuntime.createTestSystem(config);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+        TestContainer container = PaxExamRuntime.createContainer(testSystem);
+        container.start();
+        return container;
+    }
+
+    @AfterOsgi
+    public static void stopPublishers() throws IOException {
+        if (publisher != null) {
+            publisher.stop();
+        }
+        closeQuietly(kafka);
+    }
+
+    private static Option[] publisherConfig(int httpPort, String instanceName, String agentName) {
+        String workdir = String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), instanceName);
+        return CoreOptions.options( //
+                new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
+                defaultOsgiConfigs(), //
+                publishOsgiConfigs(agentName), //
+                CoreOptions.workingDirectory(workdir)
+        );
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testClearItems() throws Exception {
+
+        await()
+                .atMost(30, TimeUnit.SECONDS)
+                .until(this::queueNames, containsInAnyOrder(containsString(SUB1_AGENT)));
+
+        String subAgent1QueueName = agent.getQueueNames().iterator().next();
+
+        testClearMethod(subAgent1QueueName);
+        testRemoveMethod(subAgent1QueueName);
+
+    }
+
+    private void testClearMethod(String subAgent1QueueName)
+            throws Exception {
+
+        sendInvalidPackages(10);
+
+        await()
+                .atMost(30, TimeUnit.SECONDS)
+                .until(() -> queueSize(subAgent1QueueName, 10));
+
+        String headEntryId1 = agent.getQueue(subAgent1QueueName).getHead().getId();
+
+        agent.getQueue(subAgent1QueueName).clear(1);
+
+        await()
+                .atMost(30, TimeUnit.SECONDS)
+                .until(() -> queueSize(subAgent1QueueName, 9));
+
+        String headEntryId2 = agent.getQueue(subAgent1QueueName).getHead().getId();
+        Assert.assertNotEquals(headEntryId1, headEntryId2);
+
+        agent.getQueue(subAgent1QueueName).clear(-1);
+
+        await()
+                .atMost(30, TimeUnit.SECONDS)
+                .until(() -> queueEmpty(subAgent1QueueName));
+    }
+
+    private void testRemoveMethod(String subAgent1QueueName)
+            throws Exception {
+
+        /*
+         * This test must be removed when the
+         * REMOVABLE capability is removed
+         */
+
+        sendInvalidPackages(10);
+
+        await()
+                .atMost(30, TimeUnit.SECONDS)
+                .until(() -> queueSize(subAgent1QueueName, 10));
+
+        String headEntryId1 = agent.getQueue(subAgent1QueueName).getHead().getId();
+
+        agent.getQueue(subAgent1QueueName).remove(headEntryId1);
+
+        await()
+                .atMost(30, TimeUnit.SECONDS)
+                .until(() -> queueSize(subAgent1QueueName, 9));
+
+        String headEntryId2 = agent.getQueue(subAgent1QueueName).getHead().getId();
+        Assert.assertNotEquals(headEntryId1, headEntryId2);
+
+        String tailEntry = lastEntry(agent.getQueue(subAgent1QueueName).getEntries(0, -1)).getId();
+        Set<String> entryIds = new HashSet<>();
+        entryIds.add(tailEntry);
+        entryIds.add(headEntryId2);
+
+        agent.getQueue(subAgent1QueueName).remove(entryIds);
+
+        await()
+                .atMost(30, TimeUnit.SECONDS)
+                .until(() -> queueEmpty(subAgent1QueueName));
+    }
+
+    private List<String> queueNames() {
+        List<String> queueNames = new ArrayList<>();
+        agent.getQueueNames().forEach(queueNames::add);
+        return queueNames;
+    }
+
+    private boolean queueEmpty(String queueName) {
+        return agent.getQueue(queueName).getStatus().isEmpty();
+    }
+
+    private boolean queueSize(String queueName, int expectedSize) {
+        return agent.getQueue(queueName).getStatus().getItemsCount() == expectedSize;
+    }
+
+    private DistributionQueueEntry lastEntry(Iterable<DistributionQueueEntry> entries) {
+        Iterator<DistributionQueueEntry> iterator = entries.iterator();
+        DistributionQueueEntry last = null;
+        for (; iterator.hasNext() ; ) {
+            last = iterator.next();
+        }
+        return last;
+    }
+
+    private void sendInvalidPackages(int nb)
+            throws Exception {
+        MessageSender<PackageMessage> sender = clientProvider.createSender();
+        for (int i = 0 ; i < nb ; i++) {
+            sender.send(TOPIC_PACKAGE, newInvalidPackage(PUB1_AGENT));
+        }
+    }
+
+    private PackageMessage newInvalidPackage(String agentId) throws IOException {
+
+        final byte[] pkgBinary = new byte[2048];
+        new Random().nextBytes(pkgBinary);
+        final List<String> paths = Collections.singletonList("/content/invalid");
+        final List<String> deepPaths = Collections.emptyList();
+        final String pkgId = String.format("package-%s", UUID.randomUUID().toString());
+
+        return PackageMessage.newBuilder()
+                .setPubSlingId("slingid")
+                .setPkgId(pkgId)
+                .setPubAgentName(agentId)
+                .setPkgBinary(ByteString.copyFrom(pkgBinary))
+                .setPkgType("journal")
+                .addAllPaths(paths)
+                .setReqType(PackageMessage.ReqType.ADD)
+                .addAllDeepPaths(deepPaths)
+                .setPkgLength(pkgBinary.length)
+                .build();
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/JournalAvailableTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/JournalAvailableTest.java
new file mode 100644
index 0000000..6e8f842
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/JournalAvailableTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.when;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.impl.shared.Topics.TopicsConfiguration;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * Check that JournalAvailableChecker correctly tests for presence of the journal and topics
+ */
+public class JournalAvailableTest {
+    
+    @Spy
+    private MessagingProvider provider;
+
+    @Spy
+    Topics topics;
+    
+    @InjectMocks
+    JournalAvailableChecker checker;
+
+    @Mock
+    private BundleContext context;
+
+    private KafkaLocal kafka;
+    
+    @Before
+    public void before() throws Exception {
+        kafka = new KafkaLocal();
+        DistributionTestSupport.createTopics();
+        this.provider = DistributionTestSupport.createProvider();
+        MockitoAnnotations.initMocks(this);
+        topics.activate(topicsConfiguration(singletonMap("packageTopic", "topic_does_not_exist")));
+    }
+
+    @After
+    public void after() {
+        closeQuietly(kafka);
+    }
+   
+    @Test
+    public void test() throws Exception {
+        mockServiceReg();
+        Callable<Boolean> isAvailable = () -> { checker.run(); return checker.isAvailable(); };
+        checker.activate(context);
+        new Thread(checker).start();
+        // Wait a bit as the journal backend from previous test might still be up
+        await().atMost(30, TimeUnit.SECONDS).until(isAvailable, equalTo(false));
+
+        topics.activate(topicsConfiguration(emptyMap()));
+
+        await().atMost(30, TimeUnit.SECONDS).until(isAvailable, equalTo(true));
+
+        IOUtils.closeQuietly(kafka);
+        await().atMost(60, TimeUnit.SECONDS).until(isAvailable, equalTo(false));
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private void mockServiceReg() {
+        ServiceRegistration<JournalAvailable> reg = Mockito.mock(ServiceRegistration.class);
+        when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.eq(checker), Mockito.isNull(Dictionary.class))).thenReturn(reg);
+    }
+
+    private TopicsConfiguration topicsConfiguration(Map<String,String> props) {
+        return standardConverter()
+                .convert(props)
+                .to(TopicsConfiguration.class);
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/LatePipeAuthorDistributeTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/LatePipeAuthorDistributeTest.java
new file mode 100644
index 0000000..4571e5e
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/LatePipeAuthorDistributeTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assume.assumeTrue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
+
+/**
+ * Test for the journal coming up after the start of an author instance
+ * and the journal being stopped and restarted when author instance already runs.
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class LatePipeAuthorDistributeTest extends DistributionTestSupport {
+    private static final String PUB1_AGENT = "agent1";
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Inject
+    Distributor distributor;
+
+    @Inject
+    ResourceResolverFactory resourceResolverFactory;
+    
+    @Inject
+    MessagingProvider clientProvider;
+
+    private ToxiproxyClient proxyClient;
+    
+    @Configuration
+    public Option[] configuration() {
+        return new Option[] { //
+                //debug(),
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+                baseConfiguration(), //
+                mvn("com.google.code.gson", "gson"),
+                mvn("eu.rekawek.toxiproxy", "toxiproxy-java"),
+                defaultOsgiConfigs("http://localhost:8083"), //
+                authorOsgiConfigs() //
+        };
+    }
+
+    public Option debug() {
+        return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    }
+    
+    @Before
+    public void before() {
+        proxyClient = new ToxiproxyClient();
+        try {
+            proxyClient.getProxies();
+            deleteProxy();
+        } catch (IOException e) {
+            assumeTrue("Toxiproxy server not present. Ignoring", false);
+        }
+        
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDistribute() throws Exception {
+        Map<String, Object> authinfo = new HashMap<String, Object>();
+        try (ResourceResolver resolver = resourceResolverFactory.getAdministrativeResourceResolver(authinfo)) {
+            await().until( () -> bundleContext.getServiceReference(JournalAvailable.class), nullValue());
+            DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, "/");
+            
+            // Fail as journal is not present
+            DistributionResponse response = distributor.distribute(PUB1_AGENT, resolver, request);
+            Assert.assertFalse(response.getMessage(), response.isSuccessful());
+
+            // Succeed as journal is present over proxy
+            proxyClient.createProxy("journal", "localhost:8083", "localhost:8082");
+            log.info("Created proxy for journal");
+            await().until( () -> bundleContext.getServiceReference(JournalAvailable.class), notNullValue());
+            await().atMost(30, SECONDS).until(() -> distributor.distribute(PUB1_AGENT, resolver, request), 
+                    hasProperty("successful", equalTo(true)));
+            
+            // After deleting the proxy distribute fails again
+            deleteProxy();
+            log.info("Deleted proxy for journal");
+            await().atMost(30, TimeUnit.SECONDS).until(() -> distributor.distribute(PUB1_AGENT, resolver, request), 
+                    hasProperty("successful", equalTo(false)));
+            await().until( () -> bundleContext.getServiceReference(JournalAvailable.class), nullValue());
+        } finally {
+            deleteProxy();
+        }
+    }
+
+    private void deleteProxy() throws IOException {
+        Proxy proxy = proxyClient.getProxyOrNull("journal");
+        if (proxy != null) {
+            proxy.delete();
+        }
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
new file mode 100644
index 0000000..a013ed5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static org.apache.sling.distribution.packaging.DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka;
+import org.apache.sling.distribution.journal.messages.Messages;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.ops4j.pax.exam.util.Filter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * Starts a publish instance and checks that it can receive and process a PackageMessage from the journal
+ */
+@RunWith(PaxExamWithKafka.class)
+@ExamReactorStrategy(PerClass.class)
+public class PublisherReceiveTest extends DistributionTestSupport {
+    private static final String RESOURCE_PATH = "/my";
+
+    private static final String RESOURCE_TYPE = "sling:Folder";
+
+    Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Inject
+    ResourceResolverFactory resourceResolverFactory;
+    
+    @Inject
+    @Filter("(name=journal)")
+    private DistributionPackageBuilder packageBuilder;
+    
+    @Inject
+    @Filter("(name=subscriber-agent1)")
+    DistributionAgent subscriber;
+    
+    @Inject
+    MessagingProvider provider;
+    /*
+    @Inject
+    ServiceUserMapper serviceUserMapper;
+    */
+    
+    @Configuration
+    public Option[] configuration() {
+        return new Option[] { //
+                //debug(),
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+                baseConfiguration(), //
+                defaultOsgiConfigs(), //
+                publishOsgiConfigs() //
+        };
+    }
+
+    public Option debug() {
+        return CoreOptions.vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    }
+    
+    @Test
+    public void testReceive() throws Exception {
+    	Arrays.asList(bundleContext.getBundles()).stream()
+    	.forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + bundle.getVersion()));
+        DistributionPackage pkg = createDistPackage(RESOURCE_PATH);
+        Messages.PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
+        provider.createSender().send(TOPIC_PACKAGE, pkgMsg);
+        await().until(() -> getResource(RESOURCE_PATH), notNullValue());
+    }
+
+    /**
+     * Create a resource at the given path, build a DistributionPackage from it and delete the resource again.
+     */
+    private DistributionPackage createDistPackage(String path)
+            throws PersistenceException, DistributionException {
+        try (ResourceResolver resolver = createResolver()){
+            Resource myRes = ResourceUtil.getOrCreateResource(resolver, path, RESOURCE_TYPE, RESOURCE_TYPE, true);
+            log.info("Created resource with path " + myRes.getPath());
+            DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, myRes.getPath());
+            DistributionPackage pkg = packageBuilder.createPackage(resolver, request);
+            resolver.delete(myRes);
+            resolver.commit();
+            return pkg;
+        }
+    }
+
+    private Resource getResource(String path) {
+        try (ResourceResolver resolver = createResolver()) {
+            return resolver.getResource(path);
+        }
+    }
+    
+    @SuppressWarnings("deprecation")
+    private ResourceResolver createResolver() {
+        try {
+            Map<String, Object> authinfo = new HashMap<String, Object>();
+            return resourceResolverFactory.getAdministrativeResourceResolver(authinfo);
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private PackageMessage toPackageMessage(org.apache.sling.distribution.packaging.DistributionPackage pkg, String agentId) throws IOException {
+        final byte[] pkgBinary = IOUtils.toByteArray(pkg.createInputStream());
+        final DistributionPackageInfo pkgInfo = pkg.getInfo();
+        final List<String> paths = Arrays.asList(pkgInfo.getPaths());
+        final List<String> deepPaths = Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class));
+        final String pkgId = pkg.getId();
+
+        return PackageMessage.newBuilder()
+                .setPubSlingId("slingid")
+                .setPkgId(pkgId)
+                .setPubAgentName(agentId)
+                .setPkgBinary(ByteString.copyFrom(pkgBinary))
+                .setPkgType(pkg.getType())
+                .addAllPaths(paths)
+                .setReqType(PackageMessage.ReqType.ADD)
+                .addAllDeepPaths(deepPaths)
+                .setPkgLength(pkgBinary.length)
+                .build();
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
new file mode 100644
index 0000000..44af06a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.commons.io.IOUtils.closeQuietly;
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.auth.AuthenticationException;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.Topics;
+import org.apache.sling.distribution.journal.it.ClusterIdCleaner;
+import org.apache.sling.distribution.journal.it.DistributionTestSupport;
+import org.apache.sling.distribution.journal.it.FileUtil;
+import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.ExamSystem;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.PaxExamRuntime;
+import org.ops4j.pax.exam.util.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * - Start a cluster with 1 author and 1 publisher
+ * - Distribute one package
+ * - Stop and clone publisher
+ * - Start 2 publishers with same repo state
+ * - Distribute another package
+ * - Check both have processed both packages
+ */
+public class ScaleUpTest {
+    private static final String AUTHOR = "CloneInstanceTest.author";
+    private static final String PUBLISHER1 = "CloneInstanceTest.publisher1";
+    private static final String PUBLISHER2 = "CloneInstanceTest.publisher2";
+
+    private static final String SUB1_AGENT = "subscriber1-agent1";
+    private static final String SUB2_AGENT = "subscriber2-agent1";
+
+    private static TestContainer author;
+    private static TestContainer publisher1;
+    private static TestContainer publisher2;
+    
+    private static Logger LOG = LoggerFactory.getLogger(ScaleUpTest.class);
+    private MessagingProvider provider;
+    private volatile Semaphore packageReceived = new Semaphore(0);
+    private volatile Semaphore discoveryReceived = new Semaphore(0);
+    private long lastPackageOffset;
+    private Closeable packagePoller;
+    private Closeable discoveryPoller;
+    private volatile Map<String, Long> processedOffsets = new ConcurrentHashMap<>();
+    private KafkaLocal kafka;
+
+    @Before
+    public void before() throws Exception {
+        kafka = new KafkaLocal();
+        DistributionTestSupport.createTopics();
+        this.provider = DistributionTestSupport.createProvider();
+        Topics topics = new Topics();
+        packagePoller = this.provider.createPoller(topics.getPackageTopic(), Reset.earliest, create(PackageMessage.class, this::handle));
+        discoveryPoller = this.provider.createPoller(topics.getDiscoveryTopic(), Reset.earliest, create(DiscoveryMessage.class, this::handleDiscovery));
+    }
+
+    @Ignore
+    @Test
+    public void scaleUp() throws Exception {
+        author = startContainer(authorConfig(8182, AUTHOR));
+        publisher1 = startContainer(publisherConfig(8183, PUBLISHER1, SUB1_AGENT));
+        long offset1 = distribute();
+        await().until(this::numDiscoveredPublishers, equalTo(1));
+        await().atMost(60, SECONDS).until(() -> allProcessed(offset1));
+
+        Path sourceRepo = Paths.get(workDir(PUBLISHER1) + "-repo");
+        Path destRepo = Paths.get(workDir(PUBLISHER2) + "-repo");
+        await().atMost(60, SECONDS).until(() -> sourceRepo.toFile().exists());
+        publisher1.stop();
+        publisher1 = null;
+        this.processedOffsets.clear();
+        clone(sourceRepo, destRepo);
+        
+        publisher1 = startContainer(publisherConfig(8183, PUBLISHER1, SUB1_AGENT));
+        publisher2 = startContainer(publisherConfig(8184, PUBLISHER2, SUB2_AGENT));
+        await().until(this::numDiscoveredPublishers, equalTo(2));
+        await().atMost(60, SECONDS).until(() -> allProcessed(offset1));
+        long offset2 = distribute();
+        await().atMost(60, SECONDS).until(() -> allProcessed(offset2));
+    }
+
+    @After
+    public void after() {
+        closeQuietly(packagePoller, discoveryPoller);
+        stopContainer(publisher1);
+        stopContainer(publisher2);
+        stopContainer(author);
+        closeQuietly(kafka);
+    }
+
+    private int numDiscoveredPublishers() {
+        return processedOffsets.keySet().size();
+    }
+     
+    private boolean allProcessed(long offset) {
+        boolean allMatch = processedOffsets.values().stream().allMatch(processedOffset -> processedOffset >= offset);
+        return allMatch;
+    }
+
+    private void clone(Path sourceRepo, Path destRepo) throws IOException, InvalidFileStoreVersionException {
+        FileUtil.deleteDir(destRepo.toFile());
+        FileUtil.copyFolder(sourceRepo, destRepo);
+        new ClusterIdCleaner(destRepo.toFile()).deleteClusterId();
+    }
+
+    private void tryAcquire(Semaphore sem) throws InterruptedException {
+        assertTrue(sem.tryAcquire(10, TimeUnit.SECONDS));
+    }
+    
+
+    
+    private long distribute() throws Exception {
+        CloseableHttpClient client = HttpClients.createDefault();
+        createNode(client, "/content");
+        distributeNode(client, "/content");
+        client.close();
+        tryAcquire(packageReceived);
+        return lastPackageOffset;
+    }
+    
+    private void createNode(CloseableHttpClient client, String path)
+            throws UnsupportedEncodingException, IOException, ClientProtocolException {
+        HttpPost httpPost = new HttpPost("http://localhost:8182" + path);
+        httpPost.setEntity(new UrlEncodedFormEntity(asList(new BasicNameValuePair("jcr:primaryType","sling:Folder"))));
+        httpPost.addHeader("content-type", "application/x-www-form-urlencoded");
+        CloseableHttpResponse response = client.execute(httpPost);
+        LOG.info(response.getStatusLine().getReasonPhrase());
+        assertThat(response.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_OK));
+    }
+
+    private void distributeNode(CloseableHttpClient client, String path)
+            throws UnsupportedEncodingException, AuthenticationException, IOException, ClientProtocolException {
+        HttpPost httpPost = new HttpPost("http://localhost:8182/libs/sling/distribution/services/agents/agent1");
+        List<BasicNameValuePair> list = Arrays.asList(new BasicNameValuePair("action", "ADD"), new BasicNameValuePair("path", path));
+        httpPost.setEntity(new UrlEncodedFormEntity(list));
+        addAuthHEader(httpPost);
+        httpPost.addHeader("content-type", "application/x-www-form-urlencoded");
+     
+        CloseableHttpResponse response = client.execute(httpPost);
+        LOG.info(response.getStatusLine().getReasonPhrase());
+        assertThat(response.getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_ACCEPTED));
+    }
+
+    private void addAuthHEader(HttpPost httpPost) throws AuthenticationException {
+        httpPost.addHeader(new BasicScheme().authenticate(new UsernamePasswordCredentials("admin", "admin"), httpPost, null));
+    }
+    
+    private void stopContainer(TestContainer container) {
+        if (container != null) {
+            container.stop();
+        }
+    }
+
+    
+    private static TestContainer startContainer(Option[] config) {
+        ExamSystem testSystem;
+        try {
+            testSystem = PaxExamRuntime.createTestSystem(config);
+        } catch (IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+        TestContainer container = PaxExamRuntime.createContainer(testSystem);
+        container.start();
+        return container;
+    }
+    
+    private Option[] authorConfig(int httpPort, String instanceName) {
+        return new Option[] { //
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                .put("whitelist.bypass", "true").asOption(),
+                new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(), //
+                DistributionTestSupport.defaultOsgiConfigs(), //
+                DistributionTestSupport.authorOsgiConfigs() //
+        };
+    }
+
+    private Option[] publisherConfig(int httpPort, String instanceName, String agentName) {
+        String workdir = workDir(instanceName);
+        FileUtil.deleteDir(new File(workdir));
+        return CoreOptions.options( //
+                new DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir +"-repo"), //
+                DistributionTestSupport.defaultOsgiConfigs(), //
+                DistributionTestSupport.publishOsgiConfigs(agentName), //
+                CoreOptions.workingDirectory(workdir)
+        );
+    }
+
+    private String workDir(String instanceName) {
+        return String.format("%s/target/paxexam/%s", PathUtils.getBaseDir(), instanceName);
+    }
+    
+    private void handleDiscovery(MessageInfo info, DiscoveryMessage message) {
+        List<SubscriberState> stateList = message.getSubscriberStateList();
+        String slingId = message.getSubSlingId();
+        OptionalLong minOffset = stateList.stream().mapToLong(state -> state.getOffset()).min();
+        LOG.info("DiscoveryMessage slingid {} received {} states {}", slingId, minOffset, stateList);
+        processedOffsets.put(slingId, minOffset.orElseGet(() -> 0l));
+        discoveryReceived .release();
+    }
+    
+    private void handle(MessageInfo info, PackageMessage message) {
+        if (message.getReqType() == PackageMessage.ReqType.TEST) {
+            return;
+        }
+        LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPathsList());
+        this.lastPackageOffset = info.getOffset();
+        packageReceived.release();
+    }
+    
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
new file mode 100644
index 0000000..3e11ddb
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import org.apache.sling.distribution.journal.it.DistributionTestBase;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+
+import java.io.IOException;
+
+
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class StagedDistributionFailureTest extends DistributionTestBase {
+
+    private static final String SUB1_AGENT = "subscriber-regular";
+    private static final String SUB2_AGENT = "subscriber-golden";
+
+
+    private static TestContainer publish;
+    private static TestContainer golden_publish;
+
+
+    private static final String TEST_PATH = "/content/mytest";
+
+
+    @BeforeOsgi
+    public static void beforeOsgi() throws Exception {
+        beforeOsgiBase();
+        publish = startPublishInstance(8182,  SUB1_AGENT, false,  SUB2_AGENT);
+
+        new Thread(() -> {
+            // Wait for  at least one item in publish queue before starting golden publish
+            waitQueueItems(8182, SUB1_AGENT, 1);
+
+            LOG.info("Starting golden publish");
+            golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+        }).start();
+    }
+
+    @AfterOsgi
+    public static void afterOsgi() throws IOException {
+        if (publish != null) {
+            publish.stop();
+        }
+        if (golden_publish != null) {
+            golden_publish.stop();
+        }
+
+        afterOsgiBase();
+    }
+
+    @Before
+    public void before() {
+        createPath(TEST_PATH);
+
+        waitSubQueues(SUB1_AGENT);
+    }
+
+    @Test
+    public void testDistribute() {
+
+        distribute(TEST_PATH);
+
+        waitSubQueues(SUB1_AGENT, SUB2_AGENT);
+        waitEmptySubQueues();
+
+        waitPath(8182, TEST_PATH);
+        waitPath(8183, TEST_PATH);
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
new file mode 100644
index 0000000..1dba0e1
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it.tests;
+
+import org.apache.sling.distribution.journal.it.DistributionTestBase;
+import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
+import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
+import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.TestContainer;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+
+import java.io.IOException;
+
+
+@RunWith(ExtPaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class StagedDistributionTest extends DistributionTestBase {
+
+    private static final String SUB1_AGENT = "subscriber-regular";
+    private static final String SUB2_AGENT = "subscriber-golden";
+
+    private static TestContainer golden_publish;
+    private static TestContainer publish;
+
+
+    private static final String TEST_PATH = "/content/mytest";
+
+
+    @BeforeOsgi
+    public static void beforeOsgi() throws Exception {
+        beforeOsgiBase();
+        publish = startPublishInstance(8182,  SUB1_AGENT, false,  SUB2_AGENT);
+        golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+
+    }
+
+    @AfterOsgi
+    public static void afterOsgi() throws IOException {
+        if (publish != null) {
+            publish.stop();
+        }
+        if (golden_publish != null) {
+            golden_publish.stop();
+        }
+        afterOsgiBase();
+    }
+
+    @Before
+    public void before() {
+        createPath(TEST_PATH);
+
+        waitSubQueues(SUB1_AGENT, SUB2_AGENT);
+    }
+
+    @Test
+    public void testDistribute() {
+
+        distribute(TEST_PATH);
+
+        waitEmptySubQueues();
+
+        waitPath(8182, TEST_PATH);
+        waitPath(8183, TEST_PATH);
+
+    }
+}
diff --git a/src/test/resources/exam.properties b/src/test/resources/exam.properties
new file mode 100644
index 0000000..c70b3e5
--- /dev/null
+++ b/src/test/resources/exam.properties
@@ -0,0 +1,21 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# Switch off default pax logging
+pax.exam.logging = none
+
+# Override default pax exam bundles to avoid clash of atinject
+pax.exam.system = default
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
new file mode 100644
index 0000000..5f21dd4
--- /dev/null
+++ b/src/test/resources/logback.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration>
+  <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%date %level [%thread] %logger %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="info">
+    <appender-ref ref="console"/>
+  </root>
+
+  <logger name="kafka" level="WARN"/>
+  <logger name="org.apache.zookeeper" level="WARN"/>
+  <logger name="org.apache.kafka.clients" level="WARN"/>
+  <logger name="org.apache.sling.jcr.repoinit.impl" level="WARN"/>
+  <logger name="org.apache.jackrabbit.oak.security.internal" level="WARN"/>
+  <logger name="org.ops4j.pax.exam.cm.internal" level="WARN"/>
+  <logger name="org.apache.sling.settings.impl" level="WARN"/>
+  <logger name="org.apache.sling.engine.impl.parameters" level="WARN"/>
+</configuration>