You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2007/09/26 17:05:16 UTC

svn commit: r579686 - in /incubator/sling/trunk/event/src/main/java/org/apache/sling/event: EventUtil.java impl/AbstractRepositoryEventHandler.java impl/DistributingEventHandler.java impl/JobEventHandler.java impl/TimedEventHandler.java

Author: cziegeler
Date: Wed Sep 26 08:05:14 2007
New Revision: 579686

URL: http://svn.apache.org/viewvc?rev=579686&view=rev
Log:
Refactor event handler and put all the common stuff into AbstractRepositoryEventHandler. Start new TimedEventHandler.

Added:
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java   (with props)
Modified:
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
    incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java Wed Sep 26 08:05:14 2007
@@ -49,6 +49,12 @@
     /** The topic for jobs. */
     public static final String TOPIC_JOB = "org/apache/sling/event/job";
 
+    /** The topic for timed events. */
+    public static final String TOPIC_TIMED_EVENT = "org/apache/sling/event/timed";
+
+    /** The real topic of the event. */
+    public static final String PROPERTY_TIMED_EVENT_TOPIC = "event.topic.timed";
+
     /**
      * Create a distributable event.
      * A distributable event is distributed across the cluster.

Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java Wed Sep 26 08:05:14 2007
@@ -27,6 +27,8 @@
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jcr.Node;
 import javax.jcr.RepositoryException;
@@ -64,6 +66,9 @@
     /** @scr.property value="30" type="Integer" */
     protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
 
+    /** @scr.property value="/sling/events" */
+    protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.path";
+
     /** @scr.reference */
     protected SlingRepository repository;
 
@@ -82,6 +87,12 @@
     /** We remove everything which is older than 30min by default. */
     protected int cleanupPeriod;
 
+    /** Is the background task still running? */
+    protected boolean running;
+
+    /** A local queue for serialising the job processing. */
+    protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
+
     /**
      * Activate this component.
      * @param context
@@ -90,8 +101,17 @@
     protected void activate(final ComponentContext context)
     throws RepositoryException {
         this.applicationId = context.getBundleContext().getProperty(Constants.SLING_ID);
+        this.repositoryPath = (String)context.getProperties().get(CONFIG_PROPERTY_REPO_PATH);
         this.cleanupPeriod = (Integer)context.getProperties().get(CONFIG_PROPERTY_CLEANUP_PERIOD);
         this.startSession();
+        // start background thread
+        this.running = true;
+        final Thread t = new Thread() {
+            public void run() {
+                runInBackground();
+            }
+        };
+        t.start();
     }
 
     /**
@@ -106,6 +126,8 @@
         }
     }
 
+    protected abstract void runInBackground();
+
     /**
      * Clean up the repository.
      */
@@ -116,6 +138,14 @@
      * @param context
      */
     protected void deactivate(final ComponentContext context) {
+        // stop background thread, by adding a job info to wake it up
+        this.running = false;
+        try {
+            this.queue.put(new EventInfo());
+        } catch (InterruptedException e) {
+            // we ignore this
+            this.ignoreException(e);
+        }
         this.stopSession();
     }
 
@@ -313,4 +343,10 @@
             this.logger.debug("Ignore exception " + e.getMessage(), e);
         }
     }
+
+    protected static final class EventInfo {
+        public String nodePath;
+        public Event event;
+    }
+
 }

Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Wed Sep 26 08:05:14 2007
@@ -20,8 +20,6 @@
 
 import java.util.Calendar;
 import java.util.Dictionary;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
@@ -33,7 +31,6 @@
 
 import org.apache.jackrabbit.util.ISO8601;
 import org.apache.sling.event.EventUtil;
-import org.osgi.service.component.ComponentContext;
 import org.osgi.service.event.Event;
 
 /**
@@ -41,56 +38,13 @@
  * @scr.component inherit="true"
  * @scr.property name="event.topics" value="*"
  * @scr.property name="event.filter" value="(event.distribute=*)"
- *
+ * @scr.property name="repository.path" value="/sling/events"
  */
 public class DistributingEventHandler
     extends AbstractRepositoryEventHandler {
 
-    /** @scr.property value="/sling/events" */
-    protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.event.path";
-
-    /** A local queue for serialising the job processing. */
-    protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
-
-    /** Is the background task still running? */
-    protected boolean running;
-
-    /**
-     * Activate this component.
-     * @param context
-     * @throws RepositoryException
-     */
-    protected void activate(final ComponentContext context)
-    throws RepositoryException {
-        this.repositoryPath = (String)context.getProperties().get(CONFIG_PROPERTY_REPO_PATH);
-        super.activate(context);
-        // start background thread
-        this.running = true;
-        final Thread t = new Thread() {
-            public void run() {
-                runInBackground();
-            }
-        };
-        t.start();
-    }
-
     /**
-     * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
-     */
-    protected void deactivate(ComponentContext context) {
-        // stop background thread, by adding a job info to wake it up
-        this.running = false;
-        try {
-            this.queue.put(new EventInfo());
-        } catch (InterruptedException e) {
-            // we ignore this
-            this.ignoreException(e);
-        }
-        super.deactivate(context);
-    }
-
-    /**
-     * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#cleanUpRepository()
+     * @see org.apache.sling.core.event.impl.JobPersistenceHandler#cleanUpRepository()
      */
     protected void cleanUpRepository() {
         // we create an own session for concurrency issues
@@ -147,9 +101,9 @@
                     } catch (Exception e) {
                         this.logger.error("Exception during writing the event to the repository.", e);
                     }
-                } else if ( info.path != null) {
+                } else if ( info.nodePath != null) {
                     try {
-                        final Node eventNode = (Node) this.session.getItem(info.path);
+                        final Node eventNode = (Node) this.session.getItem(info.nodePath);
                         this.eventAdmin.postEvent(this.readEvent(eventNode));
                     } catch (Exception ex) {
                         this.logger.error("Exception during reading the event from the repository.", ex);
@@ -181,7 +135,7 @@
             final javax.jcr.observation.Event event = iterator.nextEvent();
             try {
                 final EventInfo info = new EventInfo();
-                info.path = event.getPath();
+                info.nodePath = event.getPath();
                 this.queue.put(info);
             } catch (InterruptedException ex) {
                 // we ignore this
@@ -193,7 +147,7 @@
     }
 
     /**
-     * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#addEventProperties(Node, java.util.Dictionary)
+     * @see org.apache.sling.core.event.impl.JobPersistenceHandler#addEventProperties(Node, java.util.Dictionary)
      */
     protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
     throws RepositoryException {
@@ -211,10 +165,5 @@
         super.startSession();
         this.session.getWorkspace().getObservationManager()
             .addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
-    }
-
-    protected static final class EventInfo {
-        public String path;
-        public Event event;
     }
 }

Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Sep 26 08:05:14 2007
@@ -24,8 +24,6 @@
 import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.jcr.Node;
 import javax.jcr.NodeIterator;
@@ -51,27 +49,18 @@
  * @scr.component inherit="true"
  * @scr.service interface="org.apache.sling.event.JobStatusProvider"
  * @scr.property name="event.topics" value="org/apache/sling/event/job"
- *
+ * @scr.property name="repository.path" value="/sling/jobs"
  */
 public class JobEventHandler
     extends AbstractRepositoryEventHandler
-    implements Runnable, EventUtil.JobStatusNotifier, JobStatusProvider {
-
-    /** @scr.property value="/sling/jobs" */
-    protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.job.path";
+    implements EventUtil.JobStatusNotifier, JobStatusProvider {
 
     /** @scr.property value="20" type="Long" */
     protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
 
-    /** A local queue for serialising the job processing. */
-    protected final BlockingQueue<JobInfo> queue = new LinkedBlockingQueue<JobInfo>();
-
     /** A flag indicating if this handler is currently processing a job. */
     protected boolean isProcessing = false;
 
-    /** Is the background task still running? */
-    protected boolean running;
-
     /** We check every 20 secs by default. */
     protected long sleepTime;
 
@@ -82,21 +71,11 @@
      */
     protected void activate(final ComponentContext context)
     throws RepositoryException {
-        this.repositoryPath = (String)context.getProperties().get(CONFIG_PROPERTY_REPO_PATH);
         this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) * 1000;
 
         super.activate(context);
         // load unprocessed jobs from repository
         this.loadJobs();
-
-        // start background thread
-        this.running = true;
-        final Thread t = new Thread() {
-            public void run() {
-                JobEventHandler.this.runInBackground();
-            }
-        };
-        t.start();
     }
 
     /**
@@ -156,7 +135,7 @@
     protected void runInBackground() {
         while ( this.running ) {
             // so let's wait/get the next job from the queue
-            JobInfo info = null;
+            EventInfo info = null;
             try {
                 info = this.queue.take();
             } catch (InterruptedException e) {
@@ -241,21 +220,6 @@
     }
 
     /**
-     * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
-     */
-    protected void deactivate(ComponentContext context) {
-        // stop background thread, by adding a job info to wake it up
-        this.running = false;
-        try {
-            this.queue.put(new JobInfo());
-        } catch (InterruptedException e) {
-            // we ignore this
-            this.ignoreException(e);
-        }
-        super.deactivate(context);
-    }
-
-    /**
      * Start the repository session and add this handler as an observer
      * for new events created on other nodes.
      * @throws RepositoryException
@@ -267,14 +231,14 @@
     }
 
     /**
-     * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#getContainerNodeType()
+     * @see org.apache.sling.core.event.impl.JobPersistenceHandler#getContainerNodeType()
      */
     protected String getContainerNodeType() {
         return EventHelper.JOBS_NODE_TYPE;
     }
 
     /**
-     * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#getEventNodeType()
+     * @see org.apache.sling.core.event.impl.JobPersistenceHandler#getEventNodeType()
      */
     protected String getEventNodeType() {
         return EventHelper.JOB_NODE_TYPE;
@@ -292,7 +256,7 @@
             //  job id  and job topic must be set, otherwise we ignore this event!
             if ( jobId != null && jobTopic != null ) {
                 // queue the event in order to respond quickly
-                final JobInfo info = new JobInfo();
+                final EventInfo info = new EventInfo();
                 info.event = event;
                 try {
                     this.queue.put(info);
@@ -369,7 +333,7 @@
                     this.processJob(event, eventNode, lock.getLockToken());
                 } else {
                     // we don't process the job right now, so unlock and put in local queue
-                    final JobInfo info = new JobInfo();
+                    final EventInfo info = new EventInfo();
                     info.event = event;
                     try {
                         info.nodePath = eventNode.getPath();
@@ -447,7 +411,7 @@
     }
 
     /**
-     * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+     * @see org.apache.sling.core.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
      */
     protected void addNodeProperties(Node eventNode, Event event)
     throws RepositoryException {
@@ -473,7 +437,7 @@
                         if ( !eventNode.isLocked()
                              && eventNode.hasProperty(EventHelper.NODE_PROPERTY_ACTIVE)
                              && eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
-                            final JobInfo info = new JobInfo();
+                            final EventInfo info = new EventInfo();
                             info.event = this.readEvent(eventNode);
                             info.nodePath = event.getPath();
                             try {
@@ -547,7 +511,7 @@
             final Node eventNode = result.nextNode();
             if ( !eventNode.isLocked() ) {
                 final Event event = this.readEvent(eventNode);
-                final JobInfo info = new JobInfo();
+                final EventInfo info = new EventInfo();
                 info.event = event;
                 info.nodePath = eventNode.getPath();
                 try {
@@ -596,7 +560,7 @@
                 }
             }
             if ( reschedule ) {
-                final JobInfo info = new JobInfo();
+                final EventInfo info = new EventInfo();
                 try {
                     info.event = job;
                     info.nodePath = eventNode.getPath();
@@ -680,11 +644,5 @@
      */
     public Collection<Event> scheduledJobs(String topic) {
         return this.queryCurrentJobs(topic, false);
-    }
-
-
-    protected static final class JobInfo {
-        public Event event;
-        public String nodePath;
     }
 }

Added: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java?rev=579686&view=auto
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java (added)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java Wed Sep 26 08:05:14 2007
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import javax.jcr.RepositoryException;
+
+
+/**
+ * An event handler for timed events.
+ *
+ * scr.component inherit="true"
+ * @scr.property name="event.topics" value="org/apache/sling/event/timed"
+ * @scr.property name="repository.path" value="/sling/timed-events"
+ */
+public abstract class TimedEventHandler
+    extends AbstractRepositoryEventHandler{
+
+    /**
+     * Start the repository session and add this handler as an observer
+     * for new events created on other nodes.
+     * @throws RepositoryException
+     */
+    protected void startSession() throws RepositoryException {
+        super.startSession();
+        this.session.getWorkspace().getObservationManager()
+            .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED, this.repositoryPath, true, null, null, true);
+    }
+
+}

Propchange: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision url