You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2018/05/02 10:17:31 UTC
svn commit: r1830739 - in /jackrabbit/oak/trunk/oak-store-composite: ./
src/main/java/org/apache/jackrabbit/oak/composite/
src/test/java/org/apache/jackrabbit/oak/composite/
Author: tomekr
Date: Wed May 2 10:17:31 2018
New Revision: 1830739
URL: http://svn.apache.org/viewvc?rev=1830739&view=rev
Log:
OAK-7464: Allow to choose which instance should initialize the default mount
Modified:
jackrabbit/oak/trunk/oak-store-composite/pom.xml
jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java
jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java
jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java
Modified: jackrabbit/oak/trunk/oak-store-composite/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/pom.xml?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-store-composite/pom.xml Wed May 2 10:17:31 2018
@@ -94,6 +94,11 @@
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-store-document</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-store-spi</artifactId>
<version>${project.version}</version>
</dependency>
@@ -149,5 +154,19 @@
<artifactId>org.apache.sling.testing.osgi-mock</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-store-document</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <optional>true</optional>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
Modified: jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java (original)
+++ jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java Wed May 2 10:17:31 2018
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.migration.FilteringNodeState;
import org.apache.jackrabbit.oak.plugins.migration.report.LoggingReporter;
import org.apache.jackrabbit.oak.plugins.migration.report.ReportingNodeState;
@@ -46,6 +47,8 @@ public class InitialContentMigrator {
private static final int LOG_NODE_COPY = Integer.getInteger("oak.upgrade.logNodeCopy", 10000);
+ private static final int CLUSTER_ID = Integer.getInteger("oak.composite.seed.clusterId", 1);
+
private static final Logger LOG = LoggerFactory.getLogger(InitialContentMigrator.class);
private final NodeStore targetNodeStore;
@@ -83,16 +86,41 @@ public class InitialContentMigrator {
}
private boolean isTargetInitialized() {
- return targetNodeStore.getRoot().hasChildNode("jcr:system");
+ return targetNodeStore.getRoot().hasChildNode(":composite");
+ }
+
+ private void waitForInitialization() throws IOException {
+ do {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ } while (!isTargetInitialized());
}
public void migrate() throws IOException, CommitFailedException {
if (isTargetInitialized()) {
LOG.info("The target is already initialized, no need to copy the seed mount");
- return;
+ } else if (targetNodeStore instanceof DocumentNodeStore) {
+ DocumentNodeStore dns = (DocumentNodeStore) targetNodeStore;
+ int clusterId = dns.getClusterId();
+ LOG.info("The target isn't initialized and the cluster id = {}.", clusterId);
+ if (clusterId == CLUSTER_ID) {
+ LOG.info("This cluster id {} is configured to initialized the repository.", CLUSTER_ID);
+ doMigrate();
+ } else {
+ LOG.info("Waiting until the repository is initialized by instance {}.", CLUSTER_ID);
+ waitForInitialization();
+ }
+ } else {
+ LOG.info("Initializing the default mount.");
+ doMigrate();
}
+ }
- LOG.info("Initializing the default mount using seed {}", seedMount.getName());
+ protected void doMigrate() throws CommitFailedException {
+ LOG.info("Seed {}", seedMount.getName());
LOG.info("Include: {}", includePaths);
LOG.info("Exclude: {}", excludePaths);
LOG.info("Exclude fragments: {} @ {}", excludeFragments, fragmentPaths);
@@ -107,17 +135,14 @@ public class InitialContentMigrator {
NodeState checkpointRoot = seedNodeStore.retrieve(checkpointName);
Map<String, String> checkpointInfo = seedNodeStore.checkpointInfo(checkpointName);
- boolean tracePaths;
if (previousRoot == initialRoot) {
LOG.info("Migrating first checkpoint: {}", checkpointName);
- tracePaths = true;
} else {
LOG.info("Applying diff to {}", checkpointName);
- tracePaths = false;
}
LOG.info("Checkpoint metadata: {}", checkpointInfo);
- targetRoot = copyDiffToTarget(previousRoot, checkpointRoot, targetRoot, tracePaths);
+ targetRoot = copyDiffToTarget(previousRoot, checkpointRoot, targetRoot);
previousRoot = checkpointRoot;
String newCheckpointName = targetNodeStore.checkpoint(Long.MAX_VALUE, checkpointInfo);
@@ -128,16 +153,13 @@ public class InitialContentMigrator {
}
NodeState sourceRoot = seedNodeStore.getRoot();
- boolean tracePaths;
if (previousRoot == initialRoot) {
LOG.info("No checkpoints found; migrating head");
- tracePaths = true;
} else {
LOG.info("Applying diff to head");
- tracePaths = false;
}
- targetRoot = copyDiffToTarget(previousRoot, sourceRoot, targetRoot, tracePaths);
+ targetRoot = copyDiffToTarget(previousRoot, sourceRoot, targetRoot);
LOG.info("Rewriting checkpoint names in /:async {}", nameToRevision);
NodeBuilder targetBuilder = targetRoot.builder();
@@ -157,10 +179,20 @@ public class InitialContentMigrator {
}
async.setProperty(e.getKey() + "-temp", tempValues, Type.STRINGS);
}
+
targetNodeStore.merge(targetBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+ markMigrationAsDone();
+ }
+
+ private void markMigrationAsDone() throws CommitFailedException {
+ NodeState root = targetNodeStore.getRoot();
+ NodeBuilder builder = root.builder();
+ builder.child(":composite");
+ targetNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
}
- private NodeState copyDiffToTarget(NodeState before, NodeState after, NodeState targetRoot, boolean tracePaths) throws IOException, CommitFailedException {
+ private NodeState copyDiffToTarget(NodeState before, NodeState after, NodeState targetRoot) throws CommitFailedException {
NodeBuilder targetBuilder = targetRoot.builder();
NodeState currentRoot = wrapNodeState(after);
NodeState baseRoot = wrapNodeState(before);
Modified: jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java Wed May 2 10:17:31 2018
@@ -55,7 +55,7 @@
* This is obviously correct but may be slow.
* {@link org.apache.jackrabbit.oak.composite.CompositionContext#getContributingStores(java.lang.String, java.util.function.Function)}
*/
-@Version("0.2.0")
+@Version("0.3.0")
package org.apache.jackrabbit.oak.composite;
import org.osgi.annotation.versioning.Version;
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java Wed May 2 10:17:31 2018
@@ -18,25 +18,38 @@ package org.apache.jackrabbit.oak.compos
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.mount.Mount;
import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
import org.apache.jackrabbit.oak.spi.mount.Mounts;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.junit.Test;
public class InitialContentMigratorTest {
@Test
public void migrateContentWithCheckpoints() throws IOException, CommitFailedException {
-
+
// 1. populate the seed store with
// .
// \- first
@@ -44,10 +57,10 @@ public class InitialContentMigratorTest
// \- third
//
// 2. checkpoint before adding the third node
- //
- // 3. the mount only includes the '/first' path, so only the
+ //
+ // 3. the mount only includes the '/first' path, so only the
// 'second' and 'third' nodes should be available
-
+
MemoryNodeStore seed = new MemoryNodeStore();
NodeBuilder root = seed.getRoot().builder();
root.child("first");
@@ -80,5 +93,78 @@ public class InitialContentMigratorTest
assertFalse("Node /third should not be visible from the migrated checkpoint", checkpointTargetRoot.hasChildNode("third"));
}
-
+
+ @Test
+ public void clusterInitialization() throws CommitFailedException, InterruptedException {
+ MemoryNodeStore seed = new MemoryNodeStore();
+ NodeBuilder root = seed.getRoot().builder();
+ root.child("first");
+ root.child("second");
+ root.child("third");
+ for (int i = 0; i < 10; i++) {
+ NodeBuilder b = root.child("third").child("a-" + i);
+ for (int j = 0; j < 50; j++) {
+ b.child(("b-") + j);
+ }
+ }
+ seed.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+ MountInfoProvider mip = Mounts.newBuilder().mount("seed", "/first").build();
+
+ DocumentStore sharedStore = new MemoryDocumentStore();
+ List<DocumentNodeStore> stores = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ stores.add(new DocumentMK.Builder()
+ .setDocumentStore(sharedStore)
+ .setClusterId(i + 1)
+ .build());
+ }
+
+ List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());
+ AtomicBoolean migrated = new AtomicBoolean();
+ List<Thread> threads = stores.stream()
+ .map(dns -> (Runnable) () -> runMigration(dns, seed, mip.getMountByName("seed"), exceptions, migrated))
+ .map(Thread::new)
+ .collect(Collectors.toList());
+
+ threads.stream().forEach(Thread::start);
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ assertTrue("Exception list should be empty: " + exceptions, exceptions.isEmpty());
+
+ for (DocumentNodeStore dns : stores) {
+ NodeState targetRoot = dns.getRoot();
+
+ // verify that the 'second' and 'third' nodes are visible in the migrated store
+ assertFalse("Node /first should not have been migrated", targetRoot.hasChildNode("first"));
+ assertTrue("Node /second should have been migrated", targetRoot.hasChildNode("second"));
+ assertTrue("Node /third should have been migrated", targetRoot.hasChildNode("third"));
+
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ assertTrue("Node /third/" + i + "/" + j + " should have been migrated",
+ targetRoot.getChildNode("third").getChildNode("a-" + i).hasChildNode("b-" + j));
+ }
+ }
+
+ dns.dispose();
+ }
+ }
+
+ private void runMigration(NodeStore target, NodeStore seed, Mount seedMount, List<Throwable> exceptions, AtomicBoolean migrated) {
+ try {
+ InitialContentMigrator icm = new InitialContentMigrator(target, seed, seedMount) {
+ protected void doMigrate() throws CommitFailedException {
+ if (migrated.getAndSet(true)) {
+ fail("doMigrate() has been called more than once.");
+ }
+ super.doMigrate();
+ }
+ };
+ icm.migrate();
+ } catch (Throwable e) {
+ exceptions.add(e);
+ }
+ }
}