You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2008/02/14 10:25:14 UTC

svn commit: r627686 - /incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Author: cziegeler
Date: Thu Feb 14 01:25:13 2008
New Revision: 627686

URL: http://svn.apache.org/viewvc?rev=627686&view=rev
Log:
Synchronize on session

Modified:
    incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=627686&r1=627685&r2=627686&view=diff
==============================================================================
--- incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/sling/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Thu Feb 14 01:25:13 2008
@@ -224,82 +224,86 @@
 
             if ( info != null && this.running ) {
                 // check if the node still exists
-                try {
-                    if ( this.backgroundSession.itemExists(info.nodePath) ) {
-                        final Event event = info.event;
-                        final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-                        final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
-
-                        // check how we can process this job
-                        // if parallel processing is allowed, we can just process
-                        // if not we should check if any other job with the same topic is currently running
-                        boolean process = parallelProcessing;
-                        if ( !process ) {
-                            synchronized ( this.processingMap ) {
-                                final Boolean value = this.processingMap.get(jobTopic);
-                                if ( value == null || !value.booleanValue() ) {
-                                    this.processingMap.put(jobTopic, Boolean.TRUE);
-                                    process = true;
+                synchronized (this.backgroundSession) {
+                    try {
+                        this.backgroundSession.refresh(false);
+                        if ( this.backgroundSession.itemExists(info.nodePath) ) {
+                            final Event event = info.event;
+                            final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                            final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
+
+                            // check how we can process this job
+                            // if parallel processing is allowed, we can just process
+                            // if not we should check if any other job with the same topic is currently running
+                            boolean process = parallelProcessing;
+                            if ( !process ) {
+                                synchronized ( this.processingMap ) {
+                                    final Boolean value = this.processingMap.get(jobTopic);
+                                    if ( value == null || !value.booleanValue() ) {
+                                        this.processingMap.put(jobTopic, Boolean.TRUE);
+                                        process = true;
+                                    }
                                 }
-                            }
 
-                        }
-                        if ( process ) {
-                            boolean unlock = true;
-                            try {
-                                final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
-                                if ( !eventNode.isLocked() ) {
-                                    // lock node
-                                    try {
-                                        eventNode.lock(false, true);
-                                    } catch (RepositoryException re) {
-                                        // lock failed which means that the node is locked by someone else, so we don't have to requeue
-                                        process = false;
-                                    }
-                                    if ( process ) {
-                                        unlock = false;
-                                        this.processJob(info.event, eventNode);
+                            }
+                            if ( process ) {
+                                boolean unlock = true;
+                                try {
+                                    final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+                                    if ( !eventNode.isLocked() ) {
+                                        // lock node
+                                        try {
+                                            eventNode.lock(false, true);
+                                        } catch (RepositoryException re) {
+                                            // lock failed which means that the node is locked by someone else, so we don't have to requeue
+                                            process = false;
+                                        }
+                                        if ( process ) {
+                                            unlock = false;
+                                            this.processJob(info.event, eventNode);
+                                        }
                                     }
-                                }
-                            } catch (RepositoryException e) {
-                                // ignore
-                                this.ignoreException(e);
-                            } finally {
-                                if ( unlock && !parallelProcessing ) {
-                                    synchronized ( this.processingMap ) {
-                                        this.processingMap.put(jobTopic, Boolean.FALSE);
+                                } catch (RepositoryException e) {
+                                    // ignore
+                                    this.ignoreException(e);
+                                } finally {
+                                    if ( unlock && !parallelProcessing ) {
+                                        synchronized ( this.processingMap ) {
+                                            this.processingMap.put(jobTopic, Boolean.FALSE);
+                                        }
                                     }
                                 }
-                            }
-                        } else {
-                            try {
-                                // check if the node is in processing or already finished
-                                final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
-                                if ( !eventNode.isLocked() ) {
-                                    try {
-                                        this.queue.put(info);
-                                    } catch (InterruptedException e) {
-                                        // ignore
-                                        this.ignoreException(e);
-                                    }
-                                    // wait time before we restart the cycle, if there is only one job in the queue!
-                                    if ( this.queue.size() == 1 ) {
+                            } else {
+                                try {
+                                    // check if the node is in processing or already finished
+                                    final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
+                                    if ( !eventNode.isLocked() ) {
                                         try {
-                                            Thread.sleep(this.sleepTime);
+                                            this.queue.put(info);
                                         } catch (InterruptedException e) {
                                             // ignore
                                             this.ignoreException(e);
                                         }
+                                        // wait time before we restart the cycle, if there is only one job in the queue!
+                                        if ( this.queue.size() == 1 ) {
+                                            try {
+                                                Thread.sleep(this.sleepTime);
+                                            } catch (InterruptedException e) {
+                                                // ignore
+                                                this.ignoreException(e);
+                                            }
+                                        }
                                     }
+                                } catch (RepositoryException e) {
+                                    // ignore
+                                    this.ignoreException(e);
                                 }
-                            } catch (RepositoryException e) {
-                                // ignore
-                                this.ignoreException(e);
                             }
                         }
+                    } catch (RepositoryException re) {
+                        this.ignoreException(re);
                     }
-                } catch (RepositoryException re) {
-                    this.ignoreException(re);
+
                 }
             }
         }
@@ -432,7 +436,7 @@
         final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         boolean unlock = true;
         try {
-            final Event jobEvent = this.getJobEvent(event, eventNode);
+            final Event jobEvent = this.getJobEvent(event, eventNode.getPath());
             eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
             eventNode.save();
             final EventAdmin localEA = this.eventAdmin;
@@ -469,8 +473,7 @@
      * @param e
      * @return
      */
-    protected Event getJobEvent(Event e, Node eventNode)
-    throws RepositoryException {
+    protected Event getJobEvent(Event e, String nodePath) {
         final String eventTopic = (String)e.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
         final Dictionary<String, Object> properties = new Hashtable<String, Object>();
         final String[] propertyNames = e.getPropertyNames();
@@ -478,7 +481,7 @@
             properties.put(propertyNames[i], e.getProperty(propertyNames[i]));
         }
         // put properties for finished job callback
-        properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME, new EventUtil.JobStatusNotifier.NotifierContext(this, eventNode.getPath()));
+        properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME, new EventUtil.JobStatusNotifier.NotifierContext(this, nodePath));
         return new Event(eventTopic, properties);
     }
 
@@ -632,97 +635,99 @@
         }
         final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
         // we have to use the same session for unlocking that we used for locking!
-        // TODO - we have to bring the session into the same thread!
-        try {
-            final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
-            boolean unlock = true;
+        synchronized ( this.backgroundSession ) {
             try {
-                if ( !reschedule ) {
-                    synchronized ( this.deletedJobs ) {
-                        this.deletedJobs.add(eventNodePath);
+                this.backgroundSession.refresh(false);
+                final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
+                boolean unlock = true;
+                try {
+                    if ( !reschedule ) {
+                        synchronized ( this.deletedJobs ) {
+                            this.deletedJobs.add(eventNodePath);
+                        }
+                        // unlock node
+                        try {
+                            eventNode.unlock();
+                        } catch (RepositoryException e) {
+                            // if unlock fails, we silently ignore this
+                            this.ignoreException(e);
+                        }
+                        unlock = false;
+                        // remove node from repository
+                        final Node parentNode = eventNode.getParent();
+                        eventNode.remove();
+                        parentNode.save();
                     }
-                    // unlock node
-                    try {
-                        eventNode.unlock();
-                    } catch (RepositoryException e) {
-                        // if unlock fails, we silently ignore this
-                        this.ignoreException(e);
+                } catch (RepositoryException re) {
+                    // if an exception occurs, we just log
+                    this.logger.error("Exception during job finishing.", re);
+                } finally {
+                    if ( !parallelProcessing) {
+                        final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+                        synchronized ( this.processingMap ) {
+                            this.processingMap.put(jobTopic, Boolean.FALSE);
+                        }
                     }
-                    unlock = false;
-                    // remove node from repository
-                    final Node parentNode = eventNode.getParent();
-                    eventNode.remove();
-                    parentNode.save();
-                }
-            } catch (RepositoryException re) {
-                // if an exception occurs, we just log
-                this.logger.error("Exception during job finishing.", re);
-            } finally {
-                if ( !parallelProcessing) {
-                    final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-                    synchronized ( this.processingMap ) {
-                        this.processingMap.put(jobTopic, Boolean.FALSE);
+                    if ( unlock ) {
+                        synchronized ( this.deletedJobs ) {
+                            this.deletedJobs.add(eventNodePath);
+                        }
+                        // unlock node
+                        try {
+                            eventNode.unlock();
+                        } catch (RepositoryException e) {
+                            // if unlock fails, we silently ignore this
+                            this.ignoreException(e);
+                        }
                     }
                 }
-                if ( unlock ) {
-                    synchronized ( this.deletedJobs ) {
-                        this.deletedJobs.add(eventNodePath);
-                    }
-                    // unlock node
+                if ( reschedule ) {
+                    final EventInfo info = new EventInfo();
                     try {
-                        eventNode.unlock();
+                        info.event = job;
+                        info.nodePath = eventNode.getPath();
                     } catch (RepositoryException e) {
-                        // if unlock fails, we silently ignore this
+                        // this should never happen
                         this.ignoreException(e);
                     }
-                }
-            }
-            if ( reschedule ) {
-                final EventInfo info = new EventInfo();
-                try {
-                    info.event = job;
-                    info.nodePath = eventNode.getPath();
-                } catch (RepositoryException e) {
-                    // this should never happen
-                    this.ignoreException(e);
-                }
-                // delay rescheduling?
-                if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
-                    final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
-                    final Thread t = new Thread() {
-                        public void run() {
-                            try {
-                                Thread.sleep(delay);
-                            } catch (InterruptedException e) {
-                                // this should never happen
-                                ignoreException(e);
-                            }
-                            try {
-                                queue.put(info);
-                            } catch (InterruptedException e) {
-                                // this should never happen
-                                ignoreException(e);
+                    // delay rescheduling?
+                    if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+                        final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+                        final Thread t = new Thread() {
+                            public void run() {
+                                try {
+                                    Thread.sleep(delay);
+                                } catch (InterruptedException e) {
+                                    // this should never happen
+                                    ignoreException(e);
+                                }
+                                try {
+                                    queue.put(info);
+                                } catch (InterruptedException e) {
+                                    // this should never happen
+                                    ignoreException(e);
+                                }
                             }
+                        };
+                        t.start();
+                    } else {
+                        // put directly into queue
+                        try {
+                            this.queue.put(info);
+                        } catch (InterruptedException e) {
+                            // this should never happen
+                            this.ignoreException(e);
                         }
-                    };
-                    t.start();
-                } else {
-                    // put directly into queue
-                    try {
-                        this.queue.put(info);
-                    } catch (InterruptedException e) {
-                        // this should never happen
-                        this.ignoreException(e);
                     }
                 }
+                if ( !shouldReschedule ) {
+                    return true;
+                }
+                return reschedule;
+            } catch (RepositoryException re) {
+                this.logger.error("Unable to create new session.", re);
+                return false;
             }
-            if ( !shouldReschedule ) {
-                return true;
-            }
-            return reschedule;
-        } catch (RepositoryException re) {
-            this.logger.error("Unable to create new session.", re);
-            return false;
         }
     }