You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2018/05/02 10:17:31 UTC

svn commit: r1830739 - in /jackrabbit/oak/trunk/oak-store-composite: ./ src/main/java/org/apache/jackrabbit/oak/composite/ src/test/java/org/apache/jackrabbit/oak/composite/

Author: tomekr
Date: Wed May  2 10:17:31 2018
New Revision: 1830739

URL: http://svn.apache.org/viewvc?rev=1830739&view=rev
Log:
OAK-7464: Allow to choose which instance should initialize the default mount

Modified:
    jackrabbit/oak/trunk/oak-store-composite/pom.xml
    jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java
    jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java
    jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java

Modified: jackrabbit/oak/trunk/oak-store-composite/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/pom.xml?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-store-composite/pom.xml Wed May  2 10:17:31 2018
@@ -94,6 +94,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>oak-store-document</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
       <artifactId>oak-store-spi</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -149,5 +154,19 @@
       <artifactId>org.apache.sling.testing.osgi-mock</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>oak-store-document</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongo-java-driver</artifactId>
+      <optional>true</optional>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

Modified: jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java (original)
+++ jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/InitialContentMigrator.java Wed May  2 10:17:31 2018
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
 import org.apache.jackrabbit.oak.plugins.migration.FilteringNodeState;
 import org.apache.jackrabbit.oak.plugins.migration.report.LoggingReporter;
 import org.apache.jackrabbit.oak.plugins.migration.report.ReportingNodeState;
@@ -46,6 +47,8 @@ public class InitialContentMigrator {
 
     private static final int LOG_NODE_COPY = Integer.getInteger("oak.upgrade.logNodeCopy", 10000);
 
+    private static final int CLUSTER_ID = Integer.getInteger("oak.composite.seed.clusterId", 1);
+
     private static final Logger LOG = LoggerFactory.getLogger(InitialContentMigrator.class);
 
     private final NodeStore targetNodeStore;
@@ -83,16 +86,41 @@ public class InitialContentMigrator {
     }
 
     private boolean isTargetInitialized() {
-        return targetNodeStore.getRoot().hasChildNode("jcr:system");
+        return targetNodeStore.getRoot().hasChildNode(":composite");
+    }
+
+    private void waitForInitialization() throws IOException {
+        do {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+        } while (!isTargetInitialized());
     }
 
     public void migrate() throws IOException, CommitFailedException {
         if (isTargetInitialized()) {
             LOG.info("The target is already initialized, no need to copy the seed mount");
-            return;
+        } else if (targetNodeStore instanceof DocumentNodeStore) {
+            DocumentNodeStore dns = (DocumentNodeStore) targetNodeStore;
+            int clusterId = dns.getClusterId();
+            LOG.info("The target isn't initialized and the cluster id = {}.", clusterId);
+            if (clusterId == CLUSTER_ID) {
+                LOG.info("This cluster id {} is configured to initialized the repository.", CLUSTER_ID);
+                doMigrate();
+            } else {
+                LOG.info("Waiting until the repository is initialized by instance {}.", CLUSTER_ID);
+                waitForInitialization();
+            }
+        } else {
+            LOG.info("Initializing the default mount.");
+            doMigrate();
         }
+    }
 
-        LOG.info("Initializing the default mount using seed {}", seedMount.getName());
+    protected void doMigrate() throws CommitFailedException {
+        LOG.info("Seed {}", seedMount.getName());
         LOG.info("Include: {}", includePaths);
         LOG.info("Exclude: {}", excludePaths);
         LOG.info("Exclude fragments: {} @ {}", excludeFragments, fragmentPaths);
@@ -107,17 +135,14 @@ public class InitialContentMigrator {
             NodeState checkpointRoot = seedNodeStore.retrieve(checkpointName);
             Map<String, String> checkpointInfo = seedNodeStore.checkpointInfo(checkpointName);
 
-            boolean tracePaths;
             if (previousRoot == initialRoot) {
                 LOG.info("Migrating first checkpoint: {}", checkpointName);
-                tracePaths = true;
             } else {
                 LOG.info("Applying diff to {}", checkpointName);
-                tracePaths = false;
             }
             LOG.info("Checkpoint metadata: {}", checkpointInfo);
 
-            targetRoot = copyDiffToTarget(previousRoot, checkpointRoot, targetRoot, tracePaths);
+            targetRoot = copyDiffToTarget(previousRoot, checkpointRoot, targetRoot);
             previousRoot = checkpointRoot;
 
             String newCheckpointName = targetNodeStore.checkpoint(Long.MAX_VALUE, checkpointInfo);
@@ -128,16 +153,13 @@ public class InitialContentMigrator {
         }
 
         NodeState sourceRoot = seedNodeStore.getRoot();
-        boolean tracePaths;
         if (previousRoot == initialRoot) {
             LOG.info("No checkpoints found; migrating head");
-            tracePaths = true;
         } else {
             LOG.info("Applying diff to head");
-            tracePaths = false;
         }
 
-        targetRoot = copyDiffToTarget(previousRoot, sourceRoot, targetRoot, tracePaths);
+        targetRoot = copyDiffToTarget(previousRoot, sourceRoot, targetRoot);
 
         LOG.info("Rewriting checkpoint names in /:async {}", nameToRevision);
         NodeBuilder targetBuilder = targetRoot.builder();
@@ -157,10 +179,20 @@ public class InitialContentMigrator {
             }
             async.setProperty(e.getKey() + "-temp", tempValues, Type.STRINGS);
         }
+
         targetNodeStore.merge(targetBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        markMigrationAsDone();
+    }
+
+    private void markMigrationAsDone() throws CommitFailedException {
+        NodeState root = targetNodeStore.getRoot();
+        NodeBuilder builder = root.builder();
+        builder.child(":composite");
+        targetNodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
     }
 
-    private NodeState copyDiffToTarget(NodeState before, NodeState after, NodeState targetRoot, boolean tracePaths) throws IOException, CommitFailedException {
+    private NodeState copyDiffToTarget(NodeState before, NodeState after, NodeState targetRoot) throws CommitFailedException {
         NodeBuilder targetBuilder = targetRoot.builder();
         NodeState currentRoot = wrapNodeState(after);
         NodeState baseRoot = wrapNodeState(before);

Modified: jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-store-composite/src/main/java/org/apache/jackrabbit/oak/composite/package-info.java Wed May  2 10:17:31 2018
@@ -55,7 +55,7 @@
  *  This is obviously correct but may be slow.
  *  {@link org.apache.jackrabbit.oak.composite.CompositionContext#getContributingStores(java.lang.String, java.util.function.Function)}
  */
-@Version("0.2.0")
+@Version("0.3.0")
 package org.apache.jackrabbit.oak.composite;
 
 import org.osgi.annotation.versioning.Version;
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java?rev=1830739&r1=1830738&r2=1830739&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java (original)
+++ jackrabbit/oak/trunk/oak-store-composite/src/test/java/org/apache/jackrabbit/oak/composite/InitialContentMigratorTest.java Wed May  2 10:17:31 2018
@@ -18,25 +18,38 @@ package org.apache.jackrabbit.oak.compos
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.mount.Mount;
 import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
 import org.apache.jackrabbit.oak.spi.mount.Mounts;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.junit.Test;
 
 public class InitialContentMigratorTest {
 
     @Test
     public void migrateContentWithCheckpoints() throws IOException, CommitFailedException {
-        
+
         // 1. populate the seed store with
         // .
         // \- first
@@ -44,10 +57,10 @@ public class InitialContentMigratorTest
         // \- third
         //
         // 2. checkpoint before adding the third node
-        // 
-        // 3. the mount only includes the '/first' path, so only the 
+        //
+        // 3. the mount only includes the '/first' path, so only the
         // 'second' and 'third' nodes should be available
-        
+
         MemoryNodeStore seed = new MemoryNodeStore();
         NodeBuilder root = seed.getRoot().builder();
         root.child("first");
@@ -80,5 +93,78 @@ public class InitialContentMigratorTest
         assertFalse("Node /third should not be visible from the migrated checkpoint", checkpointTargetRoot.hasChildNode("third"));
 
     }
-    
+
+    @Test
+    public void clusterInitialization() throws CommitFailedException, InterruptedException {
+        MemoryNodeStore seed = new MemoryNodeStore();
+        NodeBuilder root = seed.getRoot().builder();
+        root.child("first");
+        root.child("second");
+        root.child("third");
+        for (int i = 0; i < 10; i++) {
+            NodeBuilder b = root.child("third").child("a-" + i);
+            for (int j = 0; j < 50; j++) {
+                b.child(("b-") + j);
+            }
+        }
+        seed.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        MountInfoProvider mip = Mounts.newBuilder().mount("seed", "/first").build();
+
+        DocumentStore sharedStore = new MemoryDocumentStore();
+        List<DocumentNodeStore> stores = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            stores.add(new DocumentMK.Builder()
+                    .setDocumentStore(sharedStore)
+                    .setClusterId(i + 1)
+                    .build());
+        }
+
+        List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());
+        AtomicBoolean migrated = new AtomicBoolean();
+        List<Thread> threads = stores.stream()
+                .map(dns -> (Runnable) () -> runMigration(dns, seed, mip.getMountByName("seed"), exceptions, migrated))
+                .map(Thread::new)
+                .collect(Collectors.toList());
+
+        threads.stream().forEach(Thread::start);
+        for (Thread t : threads) {
+            t.join();
+        }
+
+        assertTrue("Exception list should be empty: " + exceptions, exceptions.isEmpty());
+
+        for (DocumentNodeStore dns : stores) {
+            NodeState targetRoot = dns.getRoot();
+
+            // verify that the 'second' and 'third' nodes are visible in the migrated store
+            assertFalse("Node /first should not have been migrated", targetRoot.hasChildNode("first"));
+            assertTrue("Node /second should have been migrated", targetRoot.hasChildNode("second"));
+            assertTrue("Node /third should have been migrated", targetRoot.hasChildNode("third"));
+
+            for (int i = 0; i < 10; i++) {
+                for (int j = 0; j < 10; j++) {
+                    assertTrue("Node /third/" + i + "/" + j + " should have been migrated",
+                            targetRoot.getChildNode("third").getChildNode("a-" + i).hasChildNode("b-" + j));
+                }
+            }
+
+            dns.dispose();
+        }
+    }
+
+    private void runMigration(NodeStore target, NodeStore seed, Mount seedMount, List<Throwable> exceptions, AtomicBoolean migrated) {
+        try {
+            InitialContentMigrator icm = new InitialContentMigrator(target, seed, seedMount) {
+                protected void doMigrate() throws CommitFailedException {
+                    if (migrated.getAndSet(true)) {
+                        fail("doMigrate() has been called more than once.");
+                    }
+                    super.doMigrate();
+                }
+            };
+            icm.migrate();
+        } catch (Throwable e) {
+            exceptions.add(e);
+        }
+    }
 }