You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by dp...@apache.org on 2011/11/29 17:41:22 UTC

svn commit: r1207957 - in /jackrabbit/trunk/jackrabbit-core/src: main/java/org/apache/jackrabbit/core/cluster/ main/java/org/apache/jackrabbit/core/journal/ test/java/org/apache/jackrabbit/core/cluster/

Author: dpfister
Date: Tue Nov 29 16:41:20 2011
New Revision: 1207957

URL: http://svn.apache.org/viewvc?rev=1207957&view=rev
Log:
JCR-3138 - Skip sync delay when changes are found
- made behaviour subclass overridable
- added test case

Added:
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java   (with props)
Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?rev=1207957&r1=1207956&r2=1207957&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Tue Nov 29 16:41:20 2011
@@ -187,6 +187,11 @@ public class ClusterNode implements Runn
      * Record deserializer.
      */
     private ClusterRecordDeserializer deserializer = new ClusterRecordDeserializer();
+    
+    /**
+     * Flag indicating whether sync is manual.
+     */
+    private boolean disableAutoSync;
 
     /**
      * Initialize this cluster node.
@@ -243,6 +248,13 @@ public class ClusterNode implements Runn
     public long getStopDelay() {
         return stopDelay;
     }
+    
+    /**
+     * Disable periodic background synchronization. Used for testing purposes, only.
+     */
+    protected void disableAutoSync() {
+        disableAutoSync = true;
+    }
 
     /**
      * Starts this cluster node.
@@ -253,11 +265,12 @@ public class ClusterNode implements Runn
         if (status == NONE) {
             sync();
 
-            Thread t = new Thread(this, "ClusterNode-" + clusterNodeId);
-            t.setDaemon(true);
-            t.start();
-            syncThread = t;
-
+            if (!disableAutoSync) {
+                Thread t = new Thread(this, "ClusterNode-" + clusterNodeId);
+                t.setDaemon(true);
+                t.start();
+                syncThread = t;
+            }
             status = STARTED;
         }
     }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?rev=1207957&r1=1207956&r2=1207957&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java Tue Nov 29 16:41:20 2011
@@ -217,39 +217,59 @@ public abstract class AbstractJournal im
      * @throws JournalException if an error occurs
      */
     protected void doSync(long startRevision) throws JournalException {
-        RecordIterator iterator = getRecords(startRevision);
-        long stopRevision = Long.MIN_VALUE;
-
-        try {
-            while (iterator.hasNext()) {
-                Record record = iterator.nextRecord();
-                if (record.getJournalId().equals(id)) {
-                    log.info("Record with revision '" + record.getRevision()
-                            + "' created by this journal, skipped.");
-                } else {
-                    RecordConsumer consumer = getConsumer(record.getProducerId());
-                    if (consumer != null) {
-                        try {
-                            consumer.consume(record);
-                        } catch (IllegalStateException e) {
-                            log.error("Could not synchronize to revision: " + record.getRevision() + " due illegal state of RecordConsumer.");
-                            return;
+        for (;;) {
+            RecordIterator iterator = getRecords(startRevision);
+            long stopRevision = Long.MIN_VALUE;
+    
+            try {
+                while (iterator.hasNext()) {
+                    Record record = iterator.nextRecord();
+                    if (record.getJournalId().equals(id)) {
+                        log.info("Record with revision '" + record.getRevision()
+                                + "' created by this journal, skipped.");
+                    } else {
+                        RecordConsumer consumer = getConsumer(record.getProducerId());
+                        if (consumer != null) {
+                            try {
+                                consumer.consume(record);
+                            } catch (IllegalStateException e) {
+                                log.error("Could not synchronize to revision: " + record.getRevision() + " due illegal state of RecordConsumer.");
+                                return;
+                            }
                         }
                     }
+                    stopRevision = record.getRevision();
                 }
-                stopRevision = record.getRevision();
+            } finally {
+                iterator.close();
             }
-        } finally {
-            iterator.close();
-        }
+    
+            if (stopRevision > 0) {
+                for (RecordConsumer consumer : consumers.values()) {
+                    consumer.setRevision(stopRevision);
+                }
+                log.info("Synchronized to revision: " + stopRevision);
 
-        if (stopRevision > 0) {
-            for (RecordConsumer consumer : consumers.values()) {
-                consumer.setRevision(stopRevision);
+                if (syncAgainOnNewRecords()) {
+                    // changes detected, sync again
+                    startRevision = stopRevision;
+                    continue;
+                }
             }
-            log.info("Synchronized to revision: " + stopRevision);
+            break;
         }
     }
+    
+    /**
+     * Return a flag indicating whether synchronization should continue
+     * in a loop until no more new records are found. Subclass overridable.
+     * 
+     * @return <code>true</code> if synchronization should continue;
+     *         <code>false</code> otherwise
+     */
+    protected boolean syncAgainOnNewRecords() {
+        return false;
+    }
 
     /**
      * Lock the journal revision, disallowing changes from other sources until

Added: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java?rev=1207957&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java (added)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java Tue Nov 29 16:41:20 2011
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import java.util.ArrayList;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.jackrabbit.core.cluster.SimpleEventListener.LockEvent;
+import org.apache.jackrabbit.core.config.ClusterConfig;
+import org.apache.jackrabbit.core.id.NodeId;
+import org.apache.jackrabbit.core.journal.Journal;
+import org.apache.jackrabbit.core.journal.JournalFactory;
+import org.apache.jackrabbit.core.journal.MemoryJournal;
+import org.apache.jackrabbit.core.journal.Record;
+import org.apache.jackrabbit.core.journal.RecordConsumer;
+import org.apache.jackrabbit.core.journal.MemoryJournal.MemoryRecord;
+import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver;
+import org.apache.jackrabbit.test.JUnitTest;
+
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
+/**
+ * Test cases for cluster synchronization.
+ */
+public class ClusterSyncTest extends JUnitTest {
+
+    /** Defaut workspace name. */
+    private static final String DEFAULT_WORKSPACE = "default";
+
+    /** Default sync delay: 5 seconds. */
+    private static final long SYNC_DELAY = 5000;
+
+    /** Master node. */
+    private ClusterNode master;
+
+    /** Slave node. */
+    /* avoid synthetic accessor */ ClusterNode slave;
+
+    /** Records shared among multiple memory journals. */
+    private final ArrayList<MemoryRecord> records = new ArrayList<MemoryRecord>();
+    
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void setUp() throws Exception {
+        master = createClusterNode("master", false);
+        master.start();
+
+        slave = createClusterNode("slave", true);
+        slave.start();
+
+        super.setUp();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void tearDown() throws Exception {
+        if (slave != null) {
+            slave.stop();
+        }
+        if (master != null) {
+            master.stop();
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Verify that sync() on a cluster node will continue fetching results until no more
+     * changes are detected. 
+     * 
+     * @throws Exception
+     */
+    public void testSyncAllChanges() throws Exception {
+        // create channel on master and slave
+        LockEventChannel channel = master.createLockChannel(DEFAULT_WORKSPACE);
+        slave.createLockChannel(DEFAULT_WORKSPACE).setListener(new SimpleEventListener());
+        
+        // add blocking consumer to slave, this will block on the first non-empty sync()
+        BlockingConsumer consumer = new BlockingConsumer();
+        slave.getJournal().register(consumer);
+        
+        // add first entry
+        LockEvent event = new LockEvent(NodeId.randomId(), true, "admin");
+        channel.create(event.getNodeId(), event.isDeep(), event.getUserId()).ended(true);
+        
+        // start a manual sync on the slave and ...
+        Thread syncOnce = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    slave.sync();
+                } catch (ClusterException e) {
+                    /* ignore */
+                }
+            }
+        });
+        syncOnce.start();
+
+        // ... wait until it blocks
+        consumer.waitUntilBlocked();
+        
+        // add second entry
+        event = new LockEvent(NodeId.randomId(), true, "admin");
+        channel.create(event.getNodeId(), event.isDeep(), event.getUserId()).ended(true);
+        
+        // now unblock slave
+        consumer.unblock();
+
+        // wait for the sync to finish
+        syncOnce.join();
+        
+        assertEquals(master.getRevision(), slave.getRevision());
+    }
+
+    /**
+     * Create a cluster node, with a memory journal referencing a list of records.
+     *
+     * @param id cluster node id
+     * @param records memory journal's list of records
+     * @param disableAutoSync if <code>true</code> background synchronization is disabled
+     */
+    private ClusterNode createClusterNode(String id, boolean disableAutoSync) throws Exception {
+        final MemoryJournal journal = new MemoryJournal() {
+            protected boolean syncAgainOnNewRecords() {
+                return true;
+            }
+        };
+        JournalFactory jf = new JournalFactory() {
+            public Journal getJournal(NamespaceResolver resolver)
+                    throws RepositoryException {
+                return journal;
+            }
+        };
+        ClusterConfig cc = new ClusterConfig(id, SYNC_DELAY, jf);
+        SimpleClusterContext context = new SimpleClusterContext(cc);
+
+        journal.setRepositoryHome(context.getRepositoryHome());
+        journal.init(id, context.getNamespaceResolver());
+        journal.setRecords(records);
+        
+        ClusterNode clusterNode = new ClusterNode();
+        clusterNode.init(context);
+        if (disableAutoSync) {
+            clusterNode.disableAutoSync();
+        }
+        return clusterNode;
+    }
+    
+    /**
+     * Custom consumer that will block inside the journal's sync() method
+     * until it is unblocked.
+     */
+    static class BlockingConsumer implements RecordConsumer {
+
+        private final Latch blockLatch = new Latch();
+        private final Latch unblockLatch = new Latch();
+        private long revision;
+        
+        public String getId() {
+            return "CUSTOM";
+        }
+
+        public long getRevision() {
+            return revision;
+        }
+
+        public void consume(Record record) {
+            /* nothing to be done here */
+        }
+
+        public void setRevision(long revision) {
+            blockLatch.release();
+            
+            try {
+                unblockLatch.acquire();
+            } catch (InterruptedException e) {
+                /* ignore */
+            }
+            this.revision = revision;
+        }
+        
+        public void waitUntilBlocked() throws InterruptedException {
+            blockLatch.acquire();
+        }
+        
+        public void unblock() {
+            unblockLatch.release();
+        }
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/ClusterSyncTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev Url

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java?rev=1207957&r1=1207956&r2=1207957&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/cluster/TestAll.java Tue Nov 29 16:41:20 2011
@@ -38,6 +38,7 @@ public class TestAll extends TestCase {
         TestSuite suite = new TestSuite();
 
         suite.addTestSuite(ClusterRecordTest.class);
+        suite.addTestSuite(ClusterSyncTest.class);
         suite.addTestSuite(DbClusterTest.class);
 
         return suite;