You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by sc...@apache.org on 2012/11/05 13:17:03 UTC
svn commit: r1405767 - in
/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core:
cluster/ClusterNode.java journal/AbstractJournal.java
journal/DatabaseJournal.java journal/Journal.java
Author: schans
Date: Mon Nov 5 12:17:03 2012
New Revision: 1405767
URL: http://svn.apache.org/viewvc?rev=1405767&view=rev
Log:
JCR-3440: Add a special case for cluster startup. This prevents the cluster node taking the global cluster lock on all syncs
Modified:
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?rev=1405767&r1=1405766&r2=1405767&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Mon Nov 5 12:17:03 2012
@@ -274,7 +274,7 @@ public class ClusterNode implements Runn
*/
public synchronized void start() throws ClusterException {
if (status == NONE) {
- sync();
+ syncOnStartup();
if (!disableAutoSync) {
Thread t = new Thread(this, "ClusterNode-" + clusterNodeId);
@@ -315,12 +315,14 @@ public class ClusterNode implements Runn
}
}
- /**
+ /**
* Synchronize contents from journal.
- *
+ *
+ * @param startup indicates if the cluster node is syncing on startup
+ * or does a normal sync.
* @throws ClusterException if an error occurs
*/
- public void sync() throws ClusterException {
+ private void internalSync(boolean startup) throws ClusterException {
int count = syncCount.get();
try {
@@ -335,13 +337,32 @@ public class ClusterNode implements Runn
// while we were waiting to acquire the syncLock.
if (count == syncCount.get()) {
syncCount.incrementAndGet();
- journal.sync();
+ journal.sync(startup);
}
} catch (JournalException e) {
throw new ClusterException(e.getMessage(), e.getCause());
} finally {
syncLock.release();
}
+
+ }
+
+ /**
+ * Synchronize contents from journal.
+ *
+ * @throws ClusterException if an error occurs
+ */
+ public void sync() throws ClusterException {
+ internalSync(false);
+ }
+
+ /**
+ * Synchronize contents from journal when a {@link ClusterNode} starts up.
+ *
+ * @throws ClusterException if an error occurs
+ */
+ public void syncOnStartup() throws ClusterException {
+ internalSync(true);
}
/**
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java?rev=1405767&r1=1405766&r2=1405767&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java Mon Nov 5 12:17:03 2012
@@ -177,22 +177,25 @@ public abstract class AbstractJournal im
return minimalRevision;
}
+
/**
* {@inheritDoc}
*/
- public void sync() throws JournalException {
+ public void sync(boolean startup) throws JournalException {
for (;;) {
if (internalVersionManager != null) {
VersioningLock.ReadLock lock =
internalVersionManager.acquireReadLock();
try {
- internalSync();
+ internalSync(startup);
} finally {
lock.release();
}
} else {
- internalSync();
+ internalSync(startup);
}
+ // startup sync already done, don't do it again
+ startup = false;
if (syncAgainOnNewRecords()) {
// sync again if there are more records available
RecordIterator it = getRecords(getMinimalRevision());
@@ -208,7 +211,7 @@ public abstract class AbstractJournal im
}
}
- private void internalSync() throws JournalException {
+ private void internalSync(boolean startup) throws JournalException {
try {
rwLock.readLock().acquire();
} catch (InterruptedException e) {
@@ -216,13 +219,21 @@ public abstract class AbstractJournal im
throw new JournalException(msg, e);
}
try {
- doSync(getMinimalRevision());
+ doSync(getMinimalRevision(), startup);
} finally {
rwLock.readLock().release();
}
}
+
+ protected void doSync(long startRevision, boolean startup) throws JournalException {
+ // by default ignore startup parameter for backwards compatibility
+ // only needed for persistence backend that need special treatment on startup.
+ doSync(startRevision);
+ }
+
/**
+ *
* Synchronize contents from journal. May be overridden by subclasses.
*
* @param startRevision start point (exclusive)
@@ -241,16 +252,13 @@ public abstract class AbstractJournal im
} else {
RecordConsumer consumer = getConsumer(record.getProducerId());
if (consumer != null) {
- try {
- consumer.consume(record);
- } catch (IllegalStateException e) {
- log.error("Could not synchronize to revision: " + record.getRevision() + " due illegal state of RecordConsumer.");
- return;
- }
+ consumer.consume(record);
}
}
stopRevision = record.getRevision();
}
+ } catch (IllegalStateException e) {
+ log.error("Could not synchronize to revision: " + (stopRevision + 1) + " due illegal state of RecordConsumer.");
} finally {
iterator.close();
}
@@ -259,7 +267,7 @@ public abstract class AbstractJournal im
for (RecordConsumer consumer : consumers.values()) {
consumer.setRevision(stopRevision);
}
- log.info("Synchronized to revision: " + stopRevision);
+ log.info("Synchronized from revision " + startRevision + " to revision: " + stopRevision);
}
}
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java?rev=1405767&r1=1405766&r2=1405767&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java Mon Nov 5 12:17:03 2012
@@ -437,24 +437,31 @@ public class DatabaseJournal extends Abs
/**
* Synchronize contents from journal. May be overridden by subclasses.
- * Override to do it in batchMode, since some databases (PSQL) when
+ * Do the initial sync in batchMode, since some databases (PSQL) when
* not in transactional mode, load all results in memory which causes
- * out of memory.
+ * out of memory. See JCR-2832
*
* @param startRevision start point (exclusive)
+ * @param startup indicates if the cluster node is syncing on startup
+ * or does a normal sync.
* @throws JournalException if an error occurs
*/
@Override
- protected void doSync(long startRevision) throws JournalException {
- try {
- startBatch();
+ protected void doSync(long startRevision, boolean startup) throws JournalException {
+ if (!startup) {
+ // if the cluster node is not starting do a normal sync
+ doSync(startRevision);
+ } else {
try {
- super.doSync(startRevision);
- } finally {
- endBatch(true);
+ startBatch();
+ try {
+ doSync(startRevision);
+ } finally {
+ endBatch(true);
+ }
+ } catch (SQLException e) {
+ throw new JournalException("Couldn't sync the cluster node", e);
}
- } catch (SQLException e) {
- throw new JournalException("Couldn't sync the cluster node", e);
}
}
Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java?rev=1405767&r1=1405766&r2=1405767&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java (original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java Mon Nov 5 12:17:03 2012
@@ -54,10 +54,15 @@ public interface Journal {
* revision with the revisions of all registered consumers and invoke
* their {@link RecordConsumer#consume} method when their identifier
* matches the one found in the records.
+ * The startup flag allow for a separate treatment of the initial sync
+ * when the cluster nodes starts up. This might be needed for example
+ * when there are a lot of old revisions in a database.
*
+ * @param startup indicates if the cluster node is syncing on startup
+ * or does a normal sync.
* @throws JournalException if an error occurs
*/
- void sync() throws JournalException;
+ void sync(boolean startup) throws JournalException;
/**
* Return the record producer for a given identifier.