You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2011/01/05 10:25:28 UTC
svn commit: r1055369 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/jcr/
main/resources/OSGI-INF/metatype/ test/java/org/apache/sling/event/impl/jobs/
Author: cziegeler
Date: Wed Jan 5 09:25:28 2011
New Revision: 1055369
URL: http://svn.apache.org/viewvc?rev=1055369&view=rev
Log:
SLING-1917 : Make Locking Strategy Configurable (for Cluster Usage)
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java (with props)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java?rev=1055369&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java Wed Jan 5 09:25:28 2011
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.QueryResult;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.support.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(metatype=true,label="%lm.name",description="%jlm.description")
+@Services({
+ @Service(value=Runnable.class),
+ @Service(value=LockManager.class)
+})
+@Properties({
+ @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
+ @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
+})
+/**
+ * The lock manager handles locking and unlocking nodes.
+ * It can be configured to handle locks in different ways:
+ */
+public class LockManager implements Runnable, EventListener {
+
+ /** Default repository path. */
+ private static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/cluster";
+
+ /** Modes */
+ private static final String MODE_SESSION = "session";
+ private static final String MODE_OPEN = "open";
+ private static final String MODE_NONE = "none";
+
+ /** Default lock mode. */
+ private static final String DEFAULT_MODE = MODE_SESSION;
+
+ /** Property to be updated by the heartbeat. */
+ private static final String LAST_MODIFIED_PROP = "lastModified";
+
+ /** Nodetype for heartbeat nodes. */
+ private static final String NODE_TYPE = "nt:unstructured";
+
+ /** Lock info prefix. */
+ private static final String OWNER_PREFIX = "SlingVersioningManager:";
+
+ /** The path where all beats are stored. */
+ @Property(value=DEFAULT_REPOSITORY_PATH, propertyPrivate=true)
+ private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+ /** Lock mode. */
+ @Property(value=DEFAULT_MODE,
+ options={@PropertyOption(name=MODE_SESSION,value="Session Scoped"),
+ @PropertyOption(name=MODE_OPEN,value="Open Scoped"),
+ @PropertyOption(name=MODE_NONE,value="None")})
+ private static final String CONFIG_PROPERTY_MODE = "lm.mode";
+
+ private static enum LockMode {
+ session,
+ open,
+ none
+ };
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Last modified map. */
+ private final Map<String, Long> lastModifiedMap = new HashMap<String, Long>();
+
+ /** Flag, indicating that this service is running. */
+ private boolean running;
+
+ /** Lock for the background session. */
+ private final Object backgroundLock = new Object();
+
+ /** Background session. */
+ private Session backgroundSession;
+
+ @Reference
+ private EnvironmentComponent environment;
+
+ /** The repository path. */
+ private String repositoryPath;
+
+ /** The id node path. */
+ private String idNodePath;
+
+ /** Lock mode .*/
+ private LockMode mode;
+
+ /**
+ * Activate this component.
+ * @param props The configuration properties.
+ */
+ @Activate
+ protected void activate(final Map<String, Object> props) throws RepositoryException {
+ this.repositoryPath = OsgiUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH);
+ this.idNodePath = repositoryPath + '/' + Environment.APPLICATION_ID;
+
+ // create the background session and register a listener
+ this.backgroundSession = this.environment.createAdminSession();
+ this.updateLastModified();
+ this.backgroundSession.getWorkspace().getObservationManager().addEventListener(this,
+ javax.jcr.observation.Event.PROPERTY_CHANGED
+ |javax.jcr.observation.Event.NODE_ADDED,
+ this.repositoryPath,
+ true,
+ null,
+ null,
+ true);
+ logger.info("Apache Sling Versioning Manager started on instance {}", Environment.APPLICATION_ID);
+ synchronized ( this.backgroundSession ) {
+ this.unlock(Environment.APPLICATION_ID);
+ }
+ this.scanExistingNodes();
+
+ this.update(props);
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ @Deactivate
+ protected void deactivate() {
+ this.running = false;
+ if ( this.backgroundSession != null ) {
+ synchronized ( this.backgroundLock ) {
+ this.logger.debug("Shutting down background session.");
+ try {
+ this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+ } catch (RepositoryException e) {
+ // we just ignore it
+ this.logger.warn("Unable to remove event listener.", e);
+ }
+ this.backgroundSession.logout();
+ this.backgroundSession = null;
+ }
+ }
+ logger.info("Apache Sling Versioning Manager stopped on instance {}", Environment.APPLICATION_ID);
+ }
+
+ @Modified
+ protected void update(final Map<String, Object> props) {
+ final LockMode oldMode = this.mode;
+ final String modeString = OsgiUtil.toString(props.get(CONFIG_PROPERTY_MODE), DEFAULT_MODE);
+ this.mode = LockMode.valueOf(modeString);
+ if ( oldMode != this.mode ) {
+ this.running = this.mode == LockMode.open;
+ }
+ }
+
+ /**
+ * Creates or gets the {@link javax.jcr.Node Node} at the given Path.
+ * In case it has to create the Node all non-existent intermediate path-elements
+ * will be create with the given intermediate node type and the returned node
+ * will be created with the given nodeType
+ *
+ * @param relativePath to create
+ * @return the Node at path
+ * @throws RepositoryException in case of exception accessing the Repository
+ */
+ private Node createPath(String relativePath)
+ throws RepositoryException {
+ final Node parentNode = this.backgroundSession.getRootNode();
+ if (!parentNode.hasNode(relativePath)) {
+ Node node = parentNode;
+ int pos = relativePath.lastIndexOf('/');
+ if ( pos != -1 ) {
+ final StringTokenizer st = new StringTokenizer(relativePath.substring(0, pos), "/");
+ while ( st.hasMoreTokens() ) {
+ final String token = st.nextToken();
+ if ( !node.hasNode(token) ) {
+ try {
+ node.addNode(token, NODE_TYPE);
+ node.getSession().save();
+ } catch (RepositoryException re) {
+ // we ignore this as this folder might be created from a different task
+ node.refresh(false);
+ }
+ }
+ node = node.getNode(token);
+ }
+ relativePath = relativePath.substring(pos + 1);
+ }
+ if ( !node.hasNode(relativePath) ) {
+ node.addNode(relativePath, NODE_TYPE);
+ }
+ return node.getNode(relativePath);
+ }
+ return parentNode.getNode(relativePath);
+ }
+
+ /**
+ * Update the last modified of this node
+ */
+ private void updateLastModified() {
+ synchronized ( this.backgroundLock ) {
+ try {
+ final Node slingNode = this.createPath(this.idNodePath.substring(1));
+ slingNode.setProperty(LAST_MODIFIED_PROP, System.currentTimeMillis());
+ this.backgroundSession.save();
+ logger.debug("Heartbeat at {}", Environment.APPLICATION_ID);
+ } catch (final RepositoryException re) {
+ this.ignoreException(re);
+ }
+ }
+ }
+
+ /** Scan for existing ids */
+ private void scanExistingNodes() {
+ synchronized ( this.backgroundLock ) {
+ try {
+ final Node rootNode = this.backgroundSession.getNode(this.repositoryPath);
+ final NodeIterator nI = rootNode.getNodes();
+ while ( nI.hasNext() ) {
+ final Node node = nI.nextNode();
+ final String id = node.getName();
+ if ( !Environment.APPLICATION_ID.equals(id) && node.hasProperty(LAST_MODIFIED_PROP) ) {
+ final javax.jcr.Property prop = node.getProperty(LAST_MODIFIED_PROP);
+ logger.debug("Updated heartbeat from {}", id);
+ this.lastModifiedMap.put(id, prop.getLong());
+ }
+ }
+ } catch (final RepositoryException re) {
+ this.ignoreException(re);
+ }
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e Exception to ignore
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Cron job
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ if ( this.running ) {
+ // we update last modified
+ this.updateLastModified();
+ final long teeMinusTwo = System.currentTimeMillis() - 120000;
+ synchronized ( this.backgroundLock ) {
+ for(final Map.Entry<String, Long> entry : this.lastModifiedMap.entrySet() ) {
+ if ( entry.getValue() != -1 ) {
+ logger.debug("Checking cluster node {}", entry.getKey());
+ if ( entry.getValue() <= teeMinusTwo ) {
+ this.unlock(entry.getKey());
+ entry.setValue(-1L);
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Search all locked nodes with this id
+ * @param id The sling id
+ */
+ private void unlock(final String id) {
+ logger.info("Trying to unlock {}", id);
+ try {
+ final String searchString = OWNER_PREFIX + id;
+
+ final QueryManager qm = this.backgroundSession.getWorkspace().getQueryManager();
+ final Query q = qm.createQuery("select * from [nt:base] where [" + JCRHelper.NODE_PROPERTY_LOCK_OWNER + "] = '" + searchString + "'",
+ Query.JCR_SQL2);
+ final QueryResult qr = q.execute();
+ final NodeIterator nI = qr.getNodes();
+ while ( nI.hasNext() ) {
+ final Node node = nI.nextNode();
+ try {
+ if ( node.hasProperty(JCRHelper.NODE_PROPERTY_LOCK_OWNER) ) {
+ if ( node.isLocked()
+ && node.getProperty(JCRHelper.NODE_PROPERTY_LOCK_OWNER).getString().endsWith(searchString) ) {
+ logger.debug("Trying to unlock node {} from {}", node.getPath(), id);
+ this.backgroundSession.getWorkspace().getLockManager().unlock(node.getPath());
+ }
+ }
+ } catch (final RepositoryException re) {
+ this.ignoreException(re);
+ }
+ }
+ } catch (final RepositoryException re) {
+ this.ignoreException(re);
+ }
+ }
+
+ /**
+ * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
+ */
+ public void onEvent(final EventIterator events) {
+ synchronized ( this.backgroundLock ) {
+ while ( events.hasNext() ) {
+ final Event event = events.nextEvent();
+ if ( this.running ) {
+ try {
+ final String path = event.getType() == javax.jcr.observation.Event.NODE_ADDED
+ ? event.getPath() + '/' + LAST_MODIFIED_PROP : event.getPath();
+ if ( this.backgroundSession.propertyExists(path) ) {
+ final javax.jcr.Property prop = this.backgroundSession.getProperty(path);
+ final String id = prop.getParent().getName();
+ logger.debug("Updated heartbeat from {}", id);
+ this.lastModifiedMap.put(id, prop.getLong());
+ }
+ } catch (final RepositoryException re) {
+ this.ignoreException(re);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Lock the node at the given path
+ * @param session The session to create the lock with
+ * @param path The path to the node to lock
+ * @throws RepositoryException If anything goes wrong
+ */
+ public void lock(final Session session, final String path) throws RepositoryException {
+ if ( this.mode != LockMode.none ) {
+ session.getWorkspace().getLockManager().lock(path, false,
+ this.mode == LockMode.session, Long.MAX_VALUE,
+ OWNER_PREFIX + Environment.APPLICATION_ID);
+ }
+ }
+
+ /**
+ * Unlock the node at the given path.
+ * @param session The session for unlocking
+ * @param path The path to the node to unlock
+ * @throws RepositoryException If anything goes wrong
+ */
+ public void unlock(final Session session, final String path)
+ throws RepositoryException {
+ if ( this.mode != LockMode.none ) {
+ session.getWorkspace().getLockManager().unlock(path);
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/LockManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1055369&r1=1055368&r2=1055369&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Wed Jan 5 09:25:28 2011
@@ -81,7 +81,9 @@ import org.slf4j.LoggerFactory;
value={"org/osgi/framework/BundleEvent/UPDATED",
"org/osgi/framework/BundleEvent/STARTED",
JobUtil.TOPIC_JOB}),
- @Property(name="scheduler.period", longValue=300,label="%persscheduler.period.name",description="%persscheduler.period.description"),
+ @Property(name="scheduler.period", longValue=300,
+ label="%persscheduler.period.name",
+ description="%persscheduler.period.description"),
@Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true)
})
public class PersistenceHandler implements EventListener, Runnable, EventHandler {
@@ -96,7 +98,9 @@ public class PersistenceHandler implemen
/** Default clean up time is 5 minutes. */
private static final int DEFAULT_CLEANUP_PERIOD = 5;
- @Property(intValue=DEFAULT_CLEANUP_PERIOD,label="%jobcleanup.period.name",description="%jobcleanup.period.description")
+ @Property(intValue=DEFAULT_CLEANUP_PERIOD,
+ label="%jobcleanup.period.name",
+ description="%jobcleanup.period.description")
private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
/** Default maximum load jobs. */
@@ -157,6 +161,9 @@ public class PersistenceHandler implemen
@Reference
private JobManager jobManager;
+ @Reference
+ private LockManager lockManager;
+
/**
* Activate this component.
* @param context The component context.
@@ -900,7 +907,7 @@ public class PersistenceHandler implemen
if ( !eventNode.isLocked() ) {
// lock node
try {
- this.backgroundSession.getWorkspace().getLockManager().lock(path, false, true, Long.MAX_VALUE, "JobEventHandler:" + Environment.APPLICATION_ID);
+ this.lockManager.lock(this.backgroundSession, path);
} catch (RepositoryException re) {
// lock failed which means that the node is locked by someone else, so we don't have to requeue
return false;
@@ -945,7 +952,7 @@ public class PersistenceHandler implemen
return;
}
try {
- this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+ this.lockManager.unlock(this.backgroundSession, path);
} catch (RepositoryException re) {
// there is nothing we can do
this.ignoreException(re);
@@ -964,8 +971,8 @@ public class PersistenceHandler implemen
return;
}
try {
+ ((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
if ( this.backgroundSession.itemExists(path) ) {
- ((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
final Node eventNode = (Node)this.backgroundSession.getItem(path);
if ( jobId == null ) {
// simply remove the node
@@ -977,7 +984,7 @@ public class PersistenceHandler implemen
this.backgroundSession.save();
// and unlock
if ( jobId != null && eventNode.isLocked() ) {
- this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+ this.lockManager.unlock(this.backgroundSession, path);
}
}
} catch (RepositoryException re) {
@@ -1043,7 +1050,7 @@ public class PersistenceHandler implemen
this.backgroundSession.save();
// and unlock
- this.backgroundSession.getWorkspace().getLockManager().unlock(path);
+ this.lockManager.unlock(this.backgroundSession, path);
return true;
}
} catch (RepositoryException re) {
@@ -1051,6 +1058,7 @@ public class PersistenceHandler implemen
this.ignoreException(re);
}
}
+ ((DefaultJobManager)this.jobManager).notifyRemoveJob(info.uniqueId);
return false;
}
Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1055369&r1=1055368&r2=1055369&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Wed Jan 5 09:25:28 2011
@@ -162,3 +162,18 @@ minPoolSize.description = The size of th
priority.name = Priority
priority.description = The priority for the threads from this pool. Default is norm.
+
+#
+# Lock Manager
+lm.name = Apache Sling Event Lock Manager
+lm.description = This service is responsible for locking and unlock the event nodes. Dependening \
+ on the environment, special configuration can improve the performance.
+
+lm.mode.name = Lock Mode
+lm.mode.description = The lock mode defines how the events are locked in the repository. The default \
+ is to use session scoped locks. With session scoped locks it's the task of the repository to propagate \
+ unlocks in a cluster if a session/cluster node dies. When open scoped locks are used, the lock manager \
+ takes care to propagate this information. Please note, that Apache Jackrabbit currently does not support \
+ session scoped locks in a cluster and the security is too strong when it comes to open scoped locks. \
+ The setting none should only be used, if no cluster is used or if by other means it is guaranteed that \
+ only a single node in the cluster is processing jobs.
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java?rev=1055369&r1=1055368&r2=1055369&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java Wed Jan 5 09:25:28 2011
@@ -25,6 +25,7 @@ import junitx.util.PrivateAccessor;
import org.apache.sling.event.impl.AbstractTest;
import org.apache.sling.event.impl.SimpleScheduler;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.jcr.LockManager;
import org.apache.sling.event.impl.jobs.jcr.PersistenceHandler;
import org.jmock.Expectations;
import org.jmock.integration.junit4.JMock;
@@ -57,6 +58,7 @@ public abstract class AbstractJobEventHa
this.handler = new PersistenceHandler();
PrivateAccessor.setField(this.handler, "environment", this.environment);
PrivateAccessor.setField(this.handler, "jobManager", this.jobManager);
+ PrivateAccessor.setField(this.handler, "lockManager", new LockManager());
// lets set up the bundle context
final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext" + activateCount);