You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/09/21 17:16:52 UTC

svn commit: r999457 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/ test/java/org/apache/sling/event/impl/

Author: cziegeler
Date: Tue Sep 21 15:15:36 2010
New Revision: 999457

URL: http://svn.apache.org/viewvc?rev=999457&view=rev
Log:
SLING-1790 : Job handling does only use cluster for failover

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Tue Sep 21 15:15:36 2010
@@ -184,11 +184,13 @@ public class DistributingEventHandler
                     try {
                         session = this.createSession();
                         final Node eventNode = (Node)session.getItem(info.nodePath);
-                        final EventAdmin localEA = this.eventAdmin;
-                        if ( localEA != null ) {
-                            localEA.postEvent(this.readEvent(eventNode));
-                        } else {
-                            this.logger.error("Unable to post event as no event admin is available.");
+                        if ( eventNode.isNodeType(this.getEventNodeType()) ) {
+                            final EventAdmin localEA = this.eventAdmin;
+                            if ( localEA != null ) {
+                                localEA.postEvent(this.readEvent(eventNode));
+                            } else {
+                                this.logger.error("Unable to post event as no event admin is available.");
+                            }
                         }
                     } catch (Exception ex) {
                         this.logger.error("Exception during reading the event from the repository.", ex);
@@ -249,6 +251,6 @@ public class DistributingEventHandler
     protected void startWriterSession() throws RepositoryException {
         super.startWriterSession();
         this.writerSession.getWorkspace().getObservationManager()
-            .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, new String[] {this.getEventNodeType()}, true);
+            .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Tue Sep 21 15:15:36 2010
@@ -581,7 +581,7 @@ public class JobEventHandler
         // for another application node
         final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
         if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null
-            && !this.applicationId.equals(appId) ) {
+            && appId != null && !this.applicationId.equals(appId) ) {
             if ( logger.isDebugEnabled() ) {
                  logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(info.event));
             }
@@ -653,6 +653,19 @@ public class JobEventHandler
         }
     }
 
+    @Override
+    protected void startWriterSession() throws RepositoryException {
+        super.startWriterSession();
+        this.writerSession.getWorkspace().getObservationManager()
+        .addEventListener(this,
+                          javax.jcr.observation.Event.NODE_ADDED,
+                          this.repositoryPath,
+                          true,
+                          null,
+                          null,
+                          true);
+    }
+
     /**
      * This method runs in the background and processes the local queue.
      */
@@ -660,13 +673,13 @@ public class JobEventHandler
         // create the background session and register a listener
         this.backgroundSession = this.createSession();
         this.backgroundSession.getWorkspace().getObservationManager()
-                .addEventListener(this,
-                                  javax.jcr.observation.Event.PROPERTY_REMOVED,
-                                  this.repositoryPath,
-                                  true,
-                                  null,
-                                  new String[] {this.getEventNodeType()},
-                                  true);
+        .addEventListener(this,
+                          javax.jcr.observation.Event.PROPERTY_REMOVED|javax.jcr.observation.Event.NODE_REMOVED,
+                          this.repositoryPath,
+                          true,
+                          null,
+                          null,
+                          true);
         if ( this.running ) {
             logger.info("Apache Sling Job Event Handler started on instance {}", this.applicationId);
             logger.debug("Job Handler Configuration: (sleepTime={} secs, maxJobRetries={}," +
@@ -1137,28 +1150,47 @@ public class JobEventHandler
         try {
             while ( iter.hasNext() ) {
                 final javax.jcr.observation.Event event = iter.nextEvent();
-                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
-                    try {
-                        final String propPath = event.getPath();
-                        int pos = propPath.lastIndexOf('/');
-                        final String nodePath = propPath.substring(0, pos);
-                        final String propertyName = propPath.substring(pos+1);
+
+                try {
+                    final String path = event.getPath();
+                    String loadNodePath = null;
+
+                    if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED) {
+                        loadNodePath = path;
+                    } else  if ( event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
+                        final int pos = path.lastIndexOf('/');
+                        final String propertyName = path.substring(pos+1);
 
                         // we are only interested in unlocks
                         if ( "jcr:lockOwner".equals(propertyName) ) {
-                            if ( s == null ) {
-                                s = this.createSession();
-                            }
-                            // we do a sanity check if the node exists first
-                            if ( s.itemExists(nodePath) ) {
-                                final Node eventNode = (Node) s.getItem(nodePath);
+                            loadNodePath = path.substring(0, pos);
+                        }
+                    } else if ( event.getType() == javax.jcr.observation.Event.NODE_REMOVED) {
+                        synchronized (unloadedJobs) {
+                            this.unloadedJobs.remove(path);
+                        }
+                    }
+                    if ( loadNodePath != null ) {
+                        if ( s == null ) {
+                            s = this.createSession();
+                        }
+                        // we do a sanity check if the node exists first
+                        if ( s.itemExists(loadNodePath) ) {
+                            final Node eventNode = (Node) s.getItem(loadNodePath);
+                            if ( eventNode.isNodeType(EventHelper.JOB_NODE_TYPE) ) {
+                                if ( event.getType() == javax.jcr.observation.Event.NODE_ADDED ) {
+                                    logger.debug("New job has been added by someone else. Trying to load from {}", loadNodePath);
+                                } else {
+                                    logger.debug("Job execution failed by someone else. Trying to load from {}", loadNodePath);
+                                }
                                 tryToLoadJob(eventNode, this.unloadedJobs);
                             }
                         }
-                    } catch (RepositoryException re) {
-                        this.logger.error("Exception during jcr event processing.", re);
                     }
+                } catch (RepositoryException re) {
+                    this.logger.error("Exception during jcr event processing.", re);
                 }
+
             }
         } finally {
             if ( s != null ) {
@@ -1255,9 +1287,8 @@ public class JobEventHandler
      */
     private boolean tryToLoadJob(final Node eventNode, final Set<String> unloadedJobSet) {
         try {
-            // first check: node should be unlocked (= not processed by any other cluster node)
-            //              job should not be finished
-            if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+            // first check: job should not be finished
+            if ( !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
                 final String nodePath = eventNode.getPath();
                 boolean isNonLocal = false;
                 // second check: is this a job that should only run on the instance that it was created on?

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Tue Sep 21 15:15:36 2010
@@ -48,7 +48,7 @@ import org.osgi.service.event.EventAdmin
 @RunWith(JMock.class)
 public abstract class AbstractRepositoryEventHandlerTest {
 
-    protected AbstractRepositoryEventHandler handler;
+    protected volatile AbstractRepositoryEventHandler handler;
 
     protected static final String REPO_PATH = "/test/events";
     protected static final String SLING_ID = "4711";
@@ -57,6 +57,8 @@ public abstract class AbstractRepository
 
     protected abstract Mockery getMockery();
 
+    protected abstract AbstractRepositoryEventHandler createHandler();
+
     protected Dictionary<String, Object> getComponentConfig() {
         final Dictionary<String, Object> config = new Hashtable<String, Object>();
         config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
@@ -79,11 +81,20 @@ public abstract class AbstractRepository
     @org.junit.AfterClass public static void shutdownRepository() throws Exception {
         if ( session != null ) {
             session.logout();
+            session = null;
         }
         RepositoryTestUtil.stopRepository();
     }
 
     @org.junit.Before public void setup() throws Exception {
+        // activate
+        this.activate(null);
+    }
+
+    int activateCount = 1;
+
+    protected void activate(final EventAdmin ea) {
+        this.handler = this.createHandler();
         this.handler.repository = RepositoryTestUtil.getSlingRepository();
         this.handler.classLoaderManager = new DynamicClassLoaderManager() {
 
@@ -92,12 +103,16 @@ public abstract class AbstractRepository
             }
         };
         // the event admin
-        final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class);
-        this.handler.eventAdmin = eventAdmin;
-        this.getMockery().checking(new Expectations() {{
-            allowing(eventAdmin).postEvent(with(any(Event.class)));
-            allowing(eventAdmin).sendEvent(with(any(Event.class)));
-        }});
+        if ( ea != null ) {
+            this.handler.eventAdmin = ea;
+        } else {
+            final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class, "eventAdmin" + activateCount);
+            this.handler.eventAdmin = eventAdmin;
+            this.getMockery().checking(new Expectations() {{
+                allowing(eventAdmin).postEvent(with(any(Event.class)));
+                allowing(eventAdmin).sendEvent(with(any(Event.class)));
+            }});
+        }
 
         // sling settings service
         this.handler.settingsService = new SlingSettingsService() {
@@ -122,13 +137,13 @@ public abstract class AbstractRepository
         this.handler.threadPool = new ThreadPoolImpl();
 
         // lets set up the bundle context
-        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext");
+        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext" + activateCount);
 
         // lets set up the component configuration
         final Dictionary<String, Object> componentConfig = this.getComponentConfig();
 
         // lets set up the compnent context
-        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "beforeComponentContext");
+        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "beforeComponentContext" + activateCount);
         this.getMockery().checking(new Expectations() {{
             allowing(componentContext).getBundleContext();
             will(returnValue(bundleContext));
@@ -137,23 +152,35 @@ public abstract class AbstractRepository
         }});
 
         this.handler.activate(componentContext);
+
         // the session is initialized in the background, so let's sleep some seconds
-        Thread.sleep(2 * 1000);
+        try {
+            Thread.sleep(2 * 1000);
+        } catch (InterruptedException e) {
+            // ignore
+        }
     }
 
-    @org.junit.After public void shutdown() throws Exception {
+    protected void deactivate() {
         // lets set up the bundle context with the sling id
-        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext");
+        final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext" + activateCount);
 
-        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "afterComponentContext");
+        final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "afterComponentContext" + activateCount);
         this.getMockery().checking(new Expectations() {{
             allowing(componentContext).getBundleContext();
             will(returnValue(bundleContext));
         }});
         this.handler.deactivate(componentContext);
+        this.handler = null;
+        activateCount++;
+    }
+
+    @org.junit.After public void shutdown() throws Exception {
+        final String path = this.handler.repositoryPath;
+        this.deactivate();
         try {
             // delete all child nodes to get a clean repository again
-            final Node rootNode = (Node) session.getItem(this.handler.repositoryPath);
+            final Node rootNode = (Node) session.getItem(path);
             final NodeIterator iter = rootNode.getNodes();
             while ( iter.hasNext() ) {
                 final Node child = iter.nextNode();

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java Tue Sep 21 15:15:36 2010
@@ -45,11 +45,15 @@ public class DistributingEventHandlerTes
     protected Mockery context;
 
     public DistributingEventHandlerTest() {
-        this.handler = new DistributingEventHandler();
         this.context = new JUnit4Mockery();
     }
 
     @Override
+    protected AbstractRepositoryEventHandler createHandler() {
+        return new DistributingEventHandler();
+    }
+
+    @Override
     protected Mockery getMockery() {
         return this.context;
     }

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=999457&r1=999456&r2=999457&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java Tue Sep 21 15:15:36 2010
@@ -34,11 +34,13 @@ import javax.jcr.RepositoryException;
 import javax.jcr.observation.EventListenerIterator;
 
 import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.job.JobStatusNotifier;
 import org.jmock.Mockery;
 import org.jmock.integration.junit4.JMock;
 import org.jmock.integration.junit4.JUnit4Mockery;
 import org.junit.runner.RunWith;
 import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
 import org.osgi.service.event.EventHandler;
 
 @RunWith(JMock.class)
@@ -47,9 +49,7 @@ public class JobEventHandlerTest extends
     protected Mockery context;
 
     public JobEventHandlerTest() {
-        this.handler = new JobEventHandler();
         this.context = new JUnit4Mockery();
-        ((JobEventHandler)this.handler).scheduler = new SimpleScheduler();
     }
 
     @Override
@@ -58,9 +58,17 @@ public class JobEventHandlerTest extends
     }
 
     @Override
+    protected AbstractRepositoryEventHandler createHandler() {
+        final JobEventHandler h = new JobEventHandler();
+        h.scheduler = new SimpleScheduler();
+        return h;
+    }
+
+    @Override
     protected Dictionary<String, Object> getComponentConfig() {
         final Dictionary<String, Object> config =  super.getComponentConfig();
         config.put("cleanup.period", 1); // set clean up to 1 minute
+        config.put("load.delay", 1); // load delay to 1 sec
         return config;
     }
 
@@ -98,6 +106,29 @@ public class JobEventHandlerTest extends
         if ( parallel != null ) {
             props.put(EventUtil.PROPERTY_JOB_PARALLEL, parallel);
         }
+        return getJobEvent(queueName, id, parallel, false);
+    }
+
+    /**
+     * Helper method to create a job event.
+     */
+    private Event getJobEvent(String queueName, String id, String parallel, boolean runlocal) {
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(EventUtil.PROPERTY_JOB_TOPIC, "sling/test");
+        if ( id != null ) {
+            props.put(EventUtil.PROPERTY_JOB_ID, id);
+        }
+        props.put(EventUtil.PROPERTY_JOB_RETRY_DELAY, 2000L);
+        props.put(EventUtil.PROPERTY_JOB_RETRIES, 2);
+        if ( queueName != null ) {
+            props.put(EventUtil.PROPERTY_JOB_QUEUE_NAME, queueName);
+        }
+        if ( parallel != null ) {
+            props.put(EventUtil.PROPERTY_JOB_PARALLEL, parallel);
+        }
+        if ( runlocal ) {
+            props.put(EventUtil.PROPERTY_JOB_RUN_LOCAL, "true");
+        }
         return new Event(EventUtil.TOPIC_JOB, props);
     }
 
@@ -438,4 +469,107 @@ public class JobEventHandlerTest extends
         assertTrue(handler.getWriterRootNode().hasNode("7"));
         assertTrue(handler.getWriterRootNode().hasNode("8"));
     }
+
+    @org.junit.Test public void testLoad() throws Exception {
+        final List<Integer> retryCountList = new ArrayList<Integer>();
+        final JobEventHandler jeh = (JobEventHandler)this.handler;
+        final Barrier cb = new Barrier(2);
+        final EventAdmin ea = new SimpleEventAdmin(new String[] {"sling/test"},
+                new EventHandler[] {
+                    new EventHandler() {
+                        int retryCount;
+                        public void handleEvent(Event event) {
+                            retryCountList.add(retryCount);
+                            EventUtil.acknowledgeJob(event);
+                            if ( retryCount == 0 ) {
+                                EventUtil.rescheduleJob(event);
+                            } else {
+                                EventUtil.finishedJob(event);
+                            }
+                            retryCount++;
+                            cb.block();
+                        }
+                    }
+                });
+        jeh.eventAdmin = ea;
+        jeh.handleEvent(getJobEvent(null, null, null));
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        this.deactivate();
+        assertEquals("Unexpected number of retries", 1, retryCountList.size());
+        Thread.sleep(3000);
+        assertEquals("Unexpected number of retries", 1, retryCountList.size());
+        this.activate(ea);
+        // the job is retried after loading, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", cb.block(5));
+        assertEquals("Unexpected number of retries", 2, retryCountList.size());
+    }
+
+    @org.junit.Test public void testRunLocal() throws Exception {
+        final List<Integer> retryCountList = new ArrayList<Integer>();
+        final List<String> sessionPath = new ArrayList<String>();
+        JobEventHandler jeh = (JobEventHandler)this.handler;
+        final Barrier cb = new Barrier(2);
+        final EventAdmin ea = new SimpleEventAdmin(new String[] {"sling/test"},
+                new EventHandler[] {
+                    new EventHandler() {
+                        int retryCount;
+                        public void handleEvent(Event event) {
+                            retryCountList.add(retryCount);
+                            EventUtil.acknowledgeJob(event);
+                            if ( retryCount == 0 || retryCount == 1) {
+                                // get the job node from the context
+                                final JobStatusNotifier.NotifierContext ctx = (JobStatusNotifier.NotifierContext) event.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+                                sessionPath.add(ctx.eventNodePath);
+                                EventUtil.rescheduleJob(event);
+                            } else {
+                                EventUtil.finishedJob(event);
+                            }
+                            retryCount++;
+                            cb.block();
+                        }
+                    }
+                });
+        jeh.eventAdmin = ea;
+        // first test: local event and we change the application id
+        jeh.handleEvent(getJobEvent(null, null, null, true));
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        this.deactivate();
+        assertEquals("Unexpected number of retries", 1, retryCountList.size());
+        Thread.sleep(3000);
+        assertEquals("Unexpected number of retries", 1, retryCountList.size());
+        assertEquals("Unexpected number of paths", 1, sessionPath.size());
+        // change app id
+        final String nodePath = sessionPath.get(0);
+        session.getNode(nodePath).setProperty(EventHelper.NODE_PROPERTY_APPLICATION, "unknown");
+        session.save();
+
+        this.activate(ea);
+        jeh = (JobEventHandler)this.handler;
+        // the job is not retried after loading, so we wait again
+        assertFalse("Unexpected event received in the given time.", cb.block(5));
+        cb.reset();
+        assertEquals("Unexpected number of retries", 1, retryCountList.size());
+
+        // second test: local event and we don't change the application id
+        jeh.handleEvent(getJobEvent(null, null, null, true));
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        this.deactivate();
+        assertEquals("Unexpected number of retries", 2, retryCountList.size());
+        Thread.sleep(3000);
+        assertEquals("Unexpected number of retries", 2, retryCountList.size());
+        assertEquals("Unexpected number of paths", 2, sessionPath.size());
+
+        this.activate(ea);
+        // the job is retried after loading, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", cb.block(5));
+        cb.reset();
+        assertEquals("Unexpected number of retries", 3, retryCountList.size());
+    }
 }