You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2007/09/26 17:05:16 UTC
svn commit: r579686 - in
/incubator/sling/trunk/event/src/main/java/org/apache/sling/event:
EventUtil.java impl/AbstractRepositoryEventHandler.java
impl/DistributingEventHandler.java impl/JobEventHandler.java
impl/TimedEventHandler.java
Author: cziegeler
Date: Wed Sep 26 08:05:14 2007
New Revision: 579686
URL: http://svn.apache.org/viewvc?rev=579686&view=rev
Log:
Refactor event handler and put all the common stuff into AbstractRepositoryEventHandler. Start new TimedEventHandler.
Added:
incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java (with props)
Modified:
incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java
incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/EventUtil.java Wed Sep 26 08:05:14 2007
@@ -49,6 +49,12 @@
/** The topic for jobs. */
public static final String TOPIC_JOB = "org/apache/sling/event/job";
+ /** The topic for timed events. */
+ public static final String TOPIC_TIMED_EVENT = "org/apache/sling/event/timed";
+
+ /** The real topic of the event. */
+ public static final String PROPERTY_TIMED_EVENT_TOPIC = "event.topic.timed";
+
/**
* Create a distributable event.
* A distributable event is distributed across the cluster.
Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java Wed Sep 26 08:05:14 2007
@@ -27,6 +27,8 @@
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.Node;
import javax.jcr.RepositoryException;
@@ -64,6 +66,9 @@
/** @scr.property value="30" type="Integer" */
protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+ /** @scr.property value="/sling/events" */
+ protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.path";
+
/** @scr.reference */
protected SlingRepository repository;
@@ -82,6 +87,12 @@
/** We remove everything which is older than 30min by default. */
protected int cleanupPeriod;
+ /** Is the background task still running? */
+ protected boolean running;
+
+ /** A local queue for serialising the job processing. */
+ protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
+
/**
* Activate this component.
* @param context
@@ -90,8 +101,17 @@
protected void activate(final ComponentContext context)
throws RepositoryException {
this.applicationId = context.getBundleContext().getProperty(Constants.SLING_ID);
+ this.repositoryPath = (String)context.getProperties().get(CONFIG_PROPERTY_REPO_PATH);
this.cleanupPeriod = (Integer)context.getProperties().get(CONFIG_PROPERTY_CLEANUP_PERIOD);
this.startSession();
+ // start background thread
+ this.running = true;
+ final Thread t = new Thread() {
+ public void run() {
+ runInBackground();
+ }
+ };
+ t.start();
}
/**
@@ -106,6 +126,8 @@
}
}
+ protected abstract void runInBackground();
+
/**
* Clean up the repository.
*/
@@ -116,6 +138,14 @@
* @param context
*/
protected void deactivate(final ComponentContext context) {
+ // stop background thread, by adding a job info to wake it up
+ this.running = false;
+ try {
+ this.queue.put(new EventInfo());
+ } catch (InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
this.stopSession();
}
@@ -313,4 +343,10 @@
this.logger.debug("Ignore exception " + e.getMessage(), e);
}
}
+
+ protected static final class EventInfo {
+ public String nodePath;
+ public Event event;
+ }
+
}
Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java Wed Sep 26 08:05:14 2007
@@ -20,8 +20,6 @@
import java.util.Calendar;
import java.util.Dictionary;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
@@ -33,7 +31,6 @@
import org.apache.jackrabbit.util.ISO8601;
import org.apache.sling.event.EventUtil;
-import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
/**
@@ -41,56 +38,13 @@
* @scr.component inherit="true"
* @scr.property name="event.topics" value="*"
* @scr.property name="event.filter" value="(event.distribute=*)"
- *
+ * @scr.property name="repository.path" value="/sling/events"
*/
public class DistributingEventHandler
extends AbstractRepositoryEventHandler {
- /** @scr.property value="/sling/events" */
- protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.event.path";
-
- /** A local queue for serialising the job processing. */
- protected final BlockingQueue<EventInfo> queue = new LinkedBlockingQueue<EventInfo>();
-
- /** Is the background task still running? */
- protected boolean running;
-
- /**
- * Activate this component.
- * @param context
- * @throws RepositoryException
- */
- protected void activate(final ComponentContext context)
- throws RepositoryException {
- this.repositoryPath = (String)context.getProperties().get(CONFIG_PROPERTY_REPO_PATH);
- super.activate(context);
- // start background thread
- this.running = true;
- final Thread t = new Thread() {
- public void run() {
- runInBackground();
- }
- };
- t.start();
- }
-
/**
- * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
- */
- protected void deactivate(ComponentContext context) {
- // stop background thread, by adding a job info to wake it up
- this.running = false;
- try {
- this.queue.put(new EventInfo());
- } catch (InterruptedException e) {
- // we ignore this
- this.ignoreException(e);
- }
- super.deactivate(context);
- }
-
- /**
- * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#cleanUpRepository()
+ * @see org.apache.sling.core.event.impl.JobPersistenceHandler#cleanUpRepository()
*/
protected void cleanUpRepository() {
// we create an own session for concurrency issues
@@ -147,9 +101,9 @@
} catch (Exception e) {
this.logger.error("Exception during writing the event to the repository.", e);
}
- } else if ( info.path != null) {
+ } else if ( info.nodePath != null) {
try {
- final Node eventNode = (Node) this.session.getItem(info.path);
+ final Node eventNode = (Node) this.session.getItem(info.nodePath);
this.eventAdmin.postEvent(this.readEvent(eventNode));
} catch (Exception ex) {
this.logger.error("Exception during reading the event from the repository.", ex);
@@ -181,7 +135,7 @@
final javax.jcr.observation.Event event = iterator.nextEvent();
try {
final EventInfo info = new EventInfo();
- info.path = event.getPath();
+ info.nodePath = event.getPath();
this.queue.put(info);
} catch (InterruptedException ex) {
// we ignore this
@@ -193,7 +147,7 @@
}
/**
- * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#addEventProperties(Node, java.util.Dictionary)
+ * @see org.apache.sling.core.event.impl.JobPersistenceHandler#addEventProperties(Node, java.util.Dictionary)
*/
protected void addEventProperties(Node eventNode, Dictionary<String, Object> properties)
throws RepositoryException {
@@ -211,10 +165,5 @@
super.startSession();
this.session.getWorkspace().getObservationManager()
.addEventListener(this, javax.jcr.observation.Event.NODE_ADDED, this.repositoryPath, true, null, null, true);
- }
-
- protected static final class EventInfo {
- public String path;
- public Event event;
}
}
Modified: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=579686&r1=579685&r2=579686&view=diff
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Sep 26 08:05:14 2007
@@ -24,8 +24,6 @@
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
@@ -51,27 +49,18 @@
* @scr.component inherit="true"
* @scr.service interface="org.apache.sling.event.JobStatusProvider"
* @scr.property name="event.topics" value="org/apache/sling/event/job"
- *
+ * @scr.property name="repository.path" value="/sling/jobs"
*/
public class JobEventHandler
extends AbstractRepositoryEventHandler
- implements Runnable, EventUtil.JobStatusNotifier, JobStatusProvider {
-
- /** @scr.property value="/sling/jobs" */
- protected static final String CONFIG_PROPERTY_REPO_PATH = "repository.job.path";
+ implements EventUtil.JobStatusNotifier, JobStatusProvider {
/** @scr.property value="20" type="Long" */
protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";
- /** A local queue for serialising the job processing. */
- protected final BlockingQueue<JobInfo> queue = new LinkedBlockingQueue<JobInfo>();
-
/** A flag indicating if this handler is currently processing a job. */
protected boolean isProcessing = false;
- /** Is the background task still running? */
- protected boolean running;
-
/** We check every 20 secs by default. */
protected long sleepTime;
@@ -82,21 +71,11 @@
*/
protected void activate(final ComponentContext context)
throws RepositoryException {
- this.repositoryPath = (String)context.getProperties().get(CONFIG_PROPERTY_REPO_PATH);
this.sleepTime = (Long)context.getProperties().get(CONFIG_PROPERTY_SLEEP_TIME) * 1000;
super.activate(context);
// load unprocessed jobs from repository
this.loadJobs();
-
- // start background thread
- this.running = true;
- final Thread t = new Thread() {
- public void run() {
- JobEventHandler.this.runInBackground();
- }
- };
- t.start();
}
/**
@@ -156,7 +135,7 @@
protected void runInBackground() {
while ( this.running ) {
// so let's wait/get the next job from the queue
- JobInfo info = null;
+ EventInfo info = null;
try {
info = this.queue.take();
} catch (InterruptedException e) {
@@ -241,21 +220,6 @@
}
/**
- * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
- */
- protected void deactivate(ComponentContext context) {
- // stop background thread, by adding a job info to wake it up
- this.running = false;
- try {
- this.queue.put(new JobInfo());
- } catch (InterruptedException e) {
- // we ignore this
- this.ignoreException(e);
- }
- super.deactivate(context);
- }
-
- /**
* Start the repository session and add this handler as an observer
* for new events created on other nodes.
* @throws RepositoryException
@@ -267,14 +231,14 @@
}
/**
- * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#getContainerNodeType()
+ * @see org.apache.sling.core.event.impl.JobPersistenceHandler#getContainerNodeType()
*/
protected String getContainerNodeType() {
return EventHelper.JOBS_NODE_TYPE;
}
/**
- * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#getEventNodeType()
+ * @see org.apache.sling.core.event.impl.JobPersistenceHandler#getEventNodeType()
*/
protected String getEventNodeType() {
return EventHelper.JOB_NODE_TYPE;
@@ -292,7 +256,7 @@
// job id and job topic must be set, otherwise we ignore this event!
if ( jobId != null && jobTopic != null ) {
// queue the event in order to respond quickly
- final JobInfo info = new JobInfo();
+ final EventInfo info = new EventInfo();
info.event = event;
try {
this.queue.put(info);
@@ -369,7 +333,7 @@
this.processJob(event, eventNode, lock.getLockToken());
} else {
// we don't process the job right now, so unlock and put in local queue
- final JobInfo info = new JobInfo();
+ final EventInfo info = new EventInfo();
info.event = event;
try {
info.nodePath = eventNode.getPath();
@@ -447,7 +411,7 @@
}
/**
- * @see org.apache.sling.core.event.impl.AbstractRepositoryEventHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
+ * @see org.apache.sling.core.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
*/
protected void addNodeProperties(Node eventNode, Event event)
throws RepositoryException {
@@ -473,7 +437,7 @@
if ( !eventNode.isLocked()
&& eventNode.hasProperty(EventHelper.NODE_PROPERTY_ACTIVE)
&& eventNode.getProperty(EventHelper.NODE_PROPERTY_ACTIVE).getBoolean() ) {
- final JobInfo info = new JobInfo();
+ final EventInfo info = new EventInfo();
info.event = this.readEvent(eventNode);
info.nodePath = event.getPath();
try {
@@ -547,7 +511,7 @@
final Node eventNode = result.nextNode();
if ( !eventNode.isLocked() ) {
final Event event = this.readEvent(eventNode);
- final JobInfo info = new JobInfo();
+ final EventInfo info = new EventInfo();
info.event = event;
info.nodePath = eventNode.getPath();
try {
@@ -596,7 +560,7 @@
}
}
if ( reschedule ) {
- final JobInfo info = new JobInfo();
+ final EventInfo info = new EventInfo();
try {
info.event = job;
info.nodePath = eventNode.getPath();
@@ -680,11 +644,5 @@
*/
public Collection<Event> scheduledJobs(String topic) {
return this.queryCurrentJobs(topic, false);
- }
-
-
- protected static final class JobInfo {
- public Event event;
- public String nodePath;
}
}
Added: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java?rev=579686&view=auto
==============================================================================
--- incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java (added)
+++ incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java Wed Sep 26 08:05:14 2007
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import javax.jcr.RepositoryException;
+
+
+/**
+ * An event handler for timed events.
+ *
+ * scr.component inherit="true"
+ * @scr.property name="event.topics" value="org/apache/sling/event/timed"
+ * @scr.property name="repository.path" value="/sling/timed-events"
+ */
+public abstract class TimedEventHandler
+ extends AbstractRepositoryEventHandler{
+
+ /**
+ * Start the repository session and add this handler as an observer
+ * for new events created on other nodes.
+ * @throws RepositoryException
+ */
+ protected void startSession() throws RepositoryException {
+ super.startSession();
+ this.session.getWorkspace().getObservationManager()
+ .addEventListener(this, javax.jcr.observation.Event.PROPERTY_CHANGED, this.repositoryPath, true, null, null, true);
+ }
+
+}
Propchange: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/sling/trunk/event/src/main/java/org/apache/sling/event/impl/TimedEventHandler.java
------------------------------------------------------------------------------
svn:keywords = author date id revision url