You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/09/21 17:16:52 UTC
svn commit: r999457 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/ test/java/org/apache/sling/event/impl/
Author: cziegeler
Date: Tue Sep 21 15:15:36 2010
New Revision: 999457
URL: http://svn.apache.org/viewvc?rev=999457&view=rev
Log:
SLING-1790 : Job handling does only use cluster for failover
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Tue Sep 21 15:15:36 2010
@@ -184,11 +184,13 @@ public class DistributingEventHandler
try {
session = this.createSession();
final Node eventNode = (Node)session.getItem(info.nodePath);
- final EventAdmin localEA = this.eventAdmin;
- if ( localEA != null ) {
- localEA.postEvent(this.readEvent(eventNode));
- } else {
- this.logger.error("Unable to post event as no event admin is available.");
+ if ( eventNode.isNodeType(this.getEventNodeType()) ) {
+ final EventAdmin localEA = this.eventAdmin;
+ if ( localEA != null ) {
+ localEA.postEvent(this.readEvent(eventNode));
+ } else {
+ this.logger.error("Unable to post event as no event admin is available.");
+ }
}
} catch (Exception ex) {
this.logger.error("Exception during reading the event from the repository.", ex);
@@ -249,6 +251,6 @@ public class DistributingEventHandler
protected void startWriterSession() throws RepositoryException {
super.startWriterSession();
this.writerSession.getWorkspace().getObservationManager()
- .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, new String[] {this.getEventNodeType()}, true);
+ .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Tue Sep 21 15:15:36 2010
@@ -581,7 +581,7 @@ public class JobEventHandler
// for another application node
final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
- && !this.applicationId.equals(appId) ) {
+ && appId != null && !this.applicationId.equals(appId) ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(info.event));
}
@@ -653,6 +653,19 @@ public class JobEventHandler
}
}
+ @Override
+ protected void startWriterSession() throws RepositoryException {
+ super.startWriterSession();
+ this.writerSession.getWorkspace().getObservationManager()
+ .addEventListener(this,
+ javax.jcr.observation.Event.NODE_ADDED,
+ this.repositoryPath,
+ true,
+ null,
+ null,
+ true);
+ }
+
/**
* This method runs in the background and processes the local queue.
*/
@@ -660,13 +673,13 @@ public class JobEventHandler
// create the background session and register a listener
this.backgroundSession = this.createSession();
this.backgroundSession.getWorkspace().getObservationManager()
- .addEventListener(this,
- javax.jcr.observation.Event.PROPERTY_REMOVED,
- this.repositoryPath,
- true,
- null,
- new String[] {this.getEventNodeType()},
- true);
+ .addEventListener(this,
+ javax.jcr.observation.Event.PROPERTY_REMOVED|javax.jcr.observation.Event.NODE_REMOVED,
+ this.repositoryPath,
+ true,
+ null,
+ null,
+ true);
if ( this.running ) {
logger.info("Apache Sling Job Event Handler started on instance {}", this.applicationId);
logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}," +
@@ -1137,28 +1150,47 @@ public class JobEventHandler
try {
while ( iter.hasNext() ) {
final javax.jcr.observation.Event event = iter.nextEvent();
- if ( event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
- try {
- final String propPath = event.getPath();
- int pos = propPath.lastIndexOf('/');
- final String nodePath = propPath.substring(0, pos);
- final String propertyName = propPath.substring(pos+1);
+
+ try {
+ final String path = event.getPath();
+ String loadNodePath = null;
+
+ if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED) {
+ loadNodePath = path;
+ } else if ( event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+ final int pos = path.lastIndexOf('/');
+ final String propertyName = path.substring(pos+1);
// we are only interested in unlocks
if ( "jcr:lockOwner".equals(propertyName) ) {
- if ( s == null ) {
- s = this.createSession();
- }
- // we do a sanity check if the node exists first
- if ( s.itemExists(nodePath) ) {
- final Node eventNode = (Node) s.getItem(nodePath);
+ loadNodePath = path.substring(0, pos);
+ }
+ } else if ( event.getType() == javax.jcr.observation.Event.NODE_REMOVED) {
+ synchronized (unloadedJobs) {
+ this.unloadedJobs.remove(path);
+ }
+ }
+ if ( loadNodePath != null ) {
+ if ( s == null ) {
+ s = this.createSession();
+ }
+ // we do a sanity check if the node exists first
+ if ( s.itemExists(loadNodePath) ) {
+ final Node eventNode = (Node) s.getItem(loadNodePath);
+ if ( eventNode.isNodeType(EventHelper.JOB_NODE_TYPE) ) {
+ if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED ) {
+ logger.debug("New job has been added by someone else. Trying to load from {}", loadNodePath);
+ } else {
+ logger.debug("Job execution failed by someone else. Trying to load from {}", loadNodePath);
+ }
tryToLoadJob(eventNode, this.unloadedJobs);
}
}
- } catch (RepositoryException re) {
- this.logger.error("Exception during jcr event processing.", re);
}
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during jcr event processing.", re);
}
+
}
} finally {
if ( s != null ) {
@@ -1255,9 +1287,8 @@ public class JobEventHandler
*/
private boolean tryToLoadJob(final Node eventNode, final Set<String> unloadedJobSet) {
try {
- // first check: node should be unlocked (= not processed by any other cluster node)
- // job should not be finished
- if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ // first check: job should not be finished
+ if ( !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
final String nodePath = eventNode.getPath();
boolean isNonLocal = false;
// second check: is this a job that should only run on the instance that it was created on?
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Tue Sep 21 15:15:36 2010
@@ -48,7 +48,7 @@ import org.osgi.service.event.EventAdmin
@RunWith(JMock.class)
public abstract class AbstractRepositoryEventHandlerTest {
- protected AbstractRepositoryEventHandler handler;
+ protected volatile AbstractRepositoryEventHandler handler;
protected static final String REPO_PATH = "/test/events";
protected static final String SLING_ID = "4711";
@@ -57,6 +57,8 @@ public abstract class AbstractRepository
protected abstract Mockery getMockery();
+ protected abstract AbstractRepositoryEventHandler createHandler();
+
protected Dictionary<String, Object> getComponentConfig() {
final Dictionary<String, Object> config = new Hashtable<String, Object>();
config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
@@ -79,11 +81,20 @@ public abstract class AbstractRepository
@org.junit.AfterClass public static void shutdownRepository() throws Exception {
if ( session != null ) {
session.logout();
+ session = null;
}
RepositoryTestUtil.stopRepository();
}
@org.junit.Before public void setup() throws Exception {
+ // activate
+ this.activate(null);
+ }
+
+ int activateCount = 1;
+
+ protected void activate(final EventAdmin ea) {
+ this.handler = this.createHandler();
this.handler.repository = RepositoryTestUtil.getSlingRepository();
this.handler.classLoaderManager = new DynamicClassLoaderManager() {
@@ -92,12 +103,16 @@ public abstract class AbstractRepository
}
};
// the event admin
- final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class);
- this.handler.eventAdmin = eventAdmin;
- this.getMockery().checking(new Expectations() {{
- allowing(eventAdmin).postEvent(with(any(Event.class)));
- allowing(eventAdmin).sendEvent(with(any(Event.class)));
- }});
+ if ( ea != null ) {
+ this.handler.eventAdmin = ea;
+ } else {
+ final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class, "eventAdmin" + activateCount);
+ this.handler.eventAdmin = eventAdmin;
+ this.getMockery().checking(new Expectations() {{
+ allowing(eventAdmin).postEvent(with(any(Event.class)));
+ allowing(eventAdmin).sendEvent(with(any(Event.class)));
+ }});
+ }
// sling settings service
this.handler.settingsService = new SlingSettingsService() {
@@ -122,13 +137,13 @@ public abstract class AbstractRepository
this.handler.threadPool = new ThreadPoolImpl();
// lets set up the bundle context
- final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext");
+ final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext" + activateCount);
// lets set up the component configuration
final Dictionary<String, Object> componentConfig = this.getComponentConfig();
// lets set up the compnent context
- final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "beforeComponentContext");
+ final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "beforeComponentContext" + activateCount);
this.getMockery().checking(new Expectations() {{
allowing(componentContext).getBundleContext();
will(returnValue(bundleContext));
@@ -137,23 +152,35 @@ public abstract class AbstractRepository
}});
this.handler.activate(componentContext);
+
// the session is initialized in the background, so let's sleep some seconds
- Thread.sleep(2 * 1000);
+ try {
+ Thread.sleep(2 * 1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
- @org.junit.After public void shutdown() throws Exception {
+ protected void deactivate() {
// lets set up the bundle context with the sling id
- final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext");
+ final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext" + activateCount);
- final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "afterComponentContext");
+ final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "afterComponentContext" + activateCount);
this.getMockery().checking(new Expectations() {{
allowing(componentContext).getBundleContext();
will(returnValue(bundleContext));
}});
this.handler.deactivate(componentContext);
+ this.handler = null;
+ activateCount++;
+ }
+
+ @org.junit.After public void shutdown() throws Exception {
+ final String path = this.handler.repositoryPath;
+ this.deactivate();
try {
// delete all child nodes to get a clean repository again
- final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+ final Node rootNode = (Node) session.getItem(path);
final NodeIterator iter = rootNode.getNodes();
while ( iter.hasNext() ) {
final Node child = iter.nextNode();
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java Tue Sep 21 15:15:36 2010
@@ -45,11 +45,15 @@ public class DistributingEventHandlerTes
protected Mockery context;
public DistributingEventHandlerTest() {
- this.handler = new DistributingEventHandler();
this.context = new JUnit4Mockery();
}
@Override
+ protected AbstractRepositoryEventHandler createHandler() {
+ return new DistributingEventHandler();
+ }
+
+ @Override
protected Mockery getMockery() {
return this.context;
}
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Tue Sep 21 15:15:36 2010
@@ -34,11 +34,13 @@ import javax.jcr.RepositoryException;
import javax.jcr.observation.EventListenerIterator;
import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.job.JobStatusNotifier;
import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.jmock.integration.junit4.JUnit4Mockery;
import org.junit.runner.RunWith;
import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
@RunWith(JMock.class)
@@ -47,9 +49,7 @@ public class JobEventHandlerTest extends
protected Mockery context;
public JobEventHandlerTest() {
- this.handler = new JobEventHandler();
this.context = new JUnit4Mockery();
- ((JobEventHandler)this.handler).scheduler = new SimpleScheduler();
}
@Override
@@ -58,9 +58,17 @@ public class JobEventHandlerTest extends
}
@Override
+ protected AbstractRepositoryEventHandler createHandler() {
+ final JobEventHandler h = new JobEventHandler();
+ h.scheduler = new SimpleScheduler();
+ return h;
+ }
+
+ @Override
protected Dictionary<String, Object> getComponentConfig() {
final Dictionary<String, Object> config = super.getComponentConfig();
config.put("cleanup.period", 1); // set clean up to 1 minute
+ config.put("load.delay", 1); // load delay to 1 sec
return config;
}
@@ -98,6 +106,29 @@ public class JobEventHandlerTest extends
if ( parallel != null ) {
props.put(EventUtil.PROPERTY_JOB_PARALLEL, parallel);
}
+ return getJobEvent(queueName, id, parallel, false);
+ }
+
+ /**
+ * Helper method to create a job event.
+ */
+ private Event getJobEvent(String queueName, String id, String parallel, boolean runlocal) {
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventUtil.PROPERTY_JOB_TOPIC, "sling/test");
+ if ( id != null ) {
+ props.put(EventUtil.PROPERTY_JOB_ID, id);
+ }
+ props.put(EventUtil.PROPERTY_JOB_RETRY_DELAY, 2000L);
+ props.put(EventUtil.PROPERTY_JOB_RETRIES, 2);
+ if ( queueName != null ) {
+ props.put(EventUtil.PROPERTY_JOB_QUEUE_NAME, queueName);
+ }
+ if ( parallel != null ) {
+ props.put(EventUtil.PROPERTY_JOB_PARALLEL, parallel);
+ }
+ if ( runlocal ) {
+ props.put(EventUtil.PROPERTY_JOB_RUN_LOCAL, "true");
+ }
return new Event(EventUtil.TOPIC_JOB, props);
}
@@ -438,4 +469,107 @@ public class JobEventHandlerTest extends
assertTrue(handler.getWriterRootNode().hasNode("7"));
assertTrue(handler.getWriterRootNode().hasNode("8"));
}
+
+ @org.junit.Test public void testLoad() throws Exception {
+ final List<Integer> retryCountList = new ArrayList<Integer>();
+ final JobEventHandler jeh = (JobEventHandler)this.handler;
+ final Barrier cb = new Barrier(2);
+ final EventAdmin ea = new SimpleEventAdmin(new String[] {"sling/test"},
+ new EventHandler[] {
+ new EventHandler() {
+ int retryCount;
+ public void handleEvent(Event event) {
+ retryCountList.add(retryCount);
+ EventUtil.acknowledgeJob(event);
+ if ( retryCount == 0 ) {
+ EventUtil.rescheduleJob(event);
+ } else {
+ EventUtil.finishedJob(event);
+ }
+ retryCount++;
+ cb.block();
+ }
+ }
+ });
+ jeh.eventAdmin = ea;
+ jeh.handleEvent(getJobEvent(null, null, null));
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ this.deactivate();
+ assertEquals("Unexpected number of retries", 1, retryCountList.size());
+ Thread.sleep(3000);
+ assertEquals("Unexpected number of retries", 1, retryCountList.size());
+ this.activate(ea);
+ // the job is retried after loading, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ assertEquals("Unexpected number of retries", 2, retryCountList.size());
+ }
+
+ @org.junit.Test public void testRunLocal() throws Exception {
+ final List<Integer> retryCountList = new ArrayList<Integer>();
+ final List<String> sessionPath = new ArrayList<String>();
+ JobEventHandler jeh = (JobEventHandler)this.handler;
+ final Barrier cb = new Barrier(2);
+ final EventAdmin ea = new SimpleEventAdmin(new String[] {"sling/test"},
+ new EventHandler[] {
+ new EventHandler() {
+ int retryCount;
+ public void handleEvent(Event event) {
+ retryCountList.add(retryCount);
+ EventUtil.acknowledgeJob(event);
+ if ( retryCount == 0 || retryCount == 1) {
+ // get the job node from the context
+ final JobStatusNotifier.NotifierContext ctx = (JobStatusNotifier.NotifierContext) event.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+ sessionPath.add(ctx.eventNodePath);
+ EventUtil.rescheduleJob(event);
+ } else {
+ EventUtil.finishedJob(event);
+ }
+ retryCount++;
+ cb.block();
+ }
+ }
+ });
+ jeh.eventAdmin = ea;
+ // first test: local event and we change the application id
+ jeh.handleEvent(getJobEvent(null, null, null, true));
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ this.deactivate();
+ assertEquals("Unexpected number of retries", 1, retryCountList.size());
+ Thread.sleep(3000);
+ assertEquals("Unexpected number of retries", 1, retryCountList.size());
+ assertEquals("Unexpected number of paths", 1, sessionPath.size());
+ // change app id
+ final String nodePath = sessionPath.get(0);
+ session.getNode(nodePath).setProperty(EventHelper.NODE_PROPERTY_APPLICATION, "unknown");
+ session.save();
+
+ this.activate(ea);
+ jeh = (JobEventHandler)this.handler;
+ // the job is not retried after loading, so we wait again
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ cb.reset();
+ assertEquals("Unexpected number of retries", 1, retryCountList.size());
+
+ // second test: local event and we don't change the application id
+ jeh.handleEvent(getJobEvent(null, null, null, true));
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ this.deactivate();
+ assertEquals("Unexpected number of retries", 2, retryCountList.size());
+ Thread.sleep(3000);
+ assertEquals("Unexpected number of retries", 2, retryCountList.size());
+ assertEquals("Unexpected number of paths", 2, sessionPath.size());
+
+ this.activate(ea);
+ // the job is retried after loading, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.", cb.block(5));
+ cb.reset();
+ assertEquals("Unexpected number of retries", 3, retryCountList.size());
+ }
}