You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/11 08:54:14 UTC
svn commit: r1021247 [5/6] - in /sling/branches/eventing-3.0: ./ .settings/
src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/...
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * An ordered job queue is processing the queue FIFO in a serialized
+ * way. If a job fails it is rescheduled and the reschedule is processed
+ * next - this basically means that failing jobs block the queue
+ * until they are finished!
+ */
+public final class OrderedJobQueue extends AbstractJobQueue {
+
+ /** The job event for rescheduling. */
+ private JobEvent jobEvent;
+
+ /** Marker indicating that this queue is currently sleeping. */
+ private volatile boolean isSleeping = false;
+
+ /** The sleeping thread. */
+ private volatile Thread sleepingThread;
+
+ /** The queue. */
+ private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+ public OrderedJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env) {
+ super(name, config, env);
+ }
+
+ @Override
+ public String getStatusInfo() {
+ return super.getStatusInfo() + ", isSleeping=" + this.isSleeping;
+ }
+
+ @Override
+ protected JobEvent start(final JobEvent processInfo) {
+ JobEvent rescheduleInfo = null;
+
+ // if we are ordered we simply wait for the finish
+ if ( this.executeJob(processInfo) ) {
+ rescheduleInfo = this.waitForFinish();
+ }
+ return rescheduleInfo;
+ }
+
+ private void setSleeping(boolean flag) {
+ this.isSleeping = flag;
+ if ( !flag ) {
+ this.sleepingThread = null;
+ }
+ }
+
+ private void setSleeping(Thread sleepingThread) {
+ this.sleepingThread = sleepingThread;
+ this.setSleeping(true);
+ }
+
+ @Override
+ public void resume() {
+ if ( this.isSleeping ) {
+ final Thread thread = this.sleepingThread;
+ if ( thread != null ) {
+ thread.interrupt();
+ }
+ }
+ super.resume();
+ }
+
+ /**
+ * Wait for the job to be finished.
+ * This is called if the queue is ordered.
+ */
+ private JobEvent waitForFinish() {
+ synchronized ( this ) {
+ this.isWaiting = true;
+ this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
+ while ( this.isWaiting ) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ this.logger.debug("Job queue {} is continuing.", this.queueName);
+ final JobEvent object = this.jobEvent;
+ this.jobEvent = null;
+ return object;
+ }
+ }
+
+ @Override
+ protected void put(final JobEvent event) {
+ try {
+ this.queue.put(event);
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+
+ @Override
+ protected JobEvent take() {
+ try {
+ return this.queue.take();
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ return this.queue.isEmpty();
+ }
+
+ @Override
+ protected void notifyFinished(final JobEvent rescheduleInfo) {
+ this.jobEvent = rescheduleInfo;
+ this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
+ this.isWaiting = false;
+ synchronized ( this ) {
+ this.notify();
+ }
+ }
+
+ @Override
+ protected JobEvent reschedule(final JobEvent info) {
+ // we just sleep for the delay time - if none, we continue and retry
+ // this job again
+ long delay = this.configuration.getRetryDelayInMs();
+ if ( info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ delay = (Long)info.event.getProperty(JobUtil.PROPERTY_JOB_RETRY_DELAY);
+ }
+ if ( delay > 0 ) {
+ setSleeping(Thread.currentThread());
+ try {
+ this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ } finally {
+ setSleeping(false);
+ }
+ }
+ return info;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ this.queue.clear();
+ super.clear();
+ }
+
+ @Override
+ public void removeAll() {
+ this.jobEvent = null;
+ super.removeAll();
+ }
+}
+
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+
+/**
+ * The default parallel job queue processing the entries FIFO.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class ParallelJobQueue extends AbstractParallelJobQueue {
+
+ /** The queue. */
+ private final BlockingQueue<JobEvent> queue = new LinkedBlockingQueue<JobEvent>();
+
+ public ParallelJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env,
+ final Scheduler scheduler) {
+ super(name, config, env, scheduler);
+ }
+
+ @Override
+ protected void put(final JobEvent event) {
+ try {
+ this.queue.put(event);
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ }
+
+ @Override
+ protected JobEvent take() {
+ try {
+ return this.queue.take();
+ } catch (final InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ return null;
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ return this.queue.isEmpty();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ this.queue.clear();
+ super.clear();
+ }
+}
+
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.jobs.JobUtil;
+
+/**
+ * This queue acts similar to the parallel job queue. Except that
+ * new jobs are selected based on a round robin topic selection scheme.
+ * Failing jobs are rescheduled and put at the end of the queue.
+ */
+public final class TopicRoundRobinJobQueue extends AbstractParallelJobQueue {
+
+ /** The topic set. */
+ private final List<String> topics = new ArrayList<String>();
+
+ /** The topic map. */
+ private final Map<String, List<JobEvent>> topicMap = new HashMap<String, List<JobEvent>>();
+
+ /** Topic index. */
+ private int topicIndex;
+
+ /** Event count. */
+ private int eventCount;
+
+ private boolean isWaitingForNext = false;
+
+ public TopicRoundRobinJobQueue(final String name,
+ final InternalQueueConfiguration config,
+ final EnvironmentComponent env,
+ final Scheduler scheduler) {
+ super(name, config, env, scheduler);
+ }
+
+ @Override
+ public String getStatusInfo() {
+ return super.getStatusInfo() + ", eventCount=" + this.eventCount + ", isWaitingForNext=" + this.isWaitingForNext;
+ }
+
+ @Override
+ protected boolean canBeMarkedForCleanUp() {
+ boolean result = super.canBeMarkedForCleanUp();
+ if ( result ) {
+ result = !this.isWaitingForNext;
+ }
+ return result;
+ }
+
+ @Override
+ protected void put(final JobEvent event) {
+ final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ synchronized ( this.topicMap ) {
+ List<JobEvent> events = this.topicMap.get(topic);
+ if ( events == null ) {
+ events = new LinkedList<JobEvent>();
+ this.topicMap.put(topic, events);
+ this.topics.add(topic);
+ }
+ events.add(event);
+ this.eventCount++;
+ if ( this.isWaitingForNext ) {
+ this.isWaitingForNext = false;
+ // wake up take()
+ this.topicMap.notify();
+ }
+ }
+ }
+
+ @Override
+ protected JobEvent take() {
+ JobEvent e = null;
+ synchronized ( this.topicMap ) {
+ if ( this.eventCount == 0 ) {
+ // wait for a new event
+ this.isWaitingForNext = true;
+ while ( this.isWaitingForNext ) {
+ try {
+ this.wait();
+ } catch (final InterruptedException ie) {
+ this.ignoreException(ie);
+ }
+ }
+ }
+ if ( this.eventCount > 0 ) {
+ while ( e == null ) {
+ final String topic = this.topics.get(this.topicIndex);
+ final List<JobEvent> events = this.topicMap.get(topic);
+ if ( events.size() > 0 ) {
+ e = events.remove(0);
+ }
+ this.topicIndex++;
+ if ( this.topicIndex == this.topics.size() ) {
+ this.topicIndex = 0;
+ }
+ }
+ this.eventCount--;
+ }
+ }
+ return e;
+ }
+
+ @Override
+ protected boolean isEmpty() {
+ synchronized ( this.topicMap ) {
+ return this.eventCount == 0;
+ }
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Queue#clear()
+ */
+ public void clear() {
+ synchronized ( this.topicMap ) {
+ this.eventCount = 0;
+ this.topics.clear();
+ this.topicMap.clear();
+ }
+ super.clear();
+ }
+}
+
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.support;
+
+import org.apache.sling.commons.threads.ThreadPool;
+
+/**
+ * This class provides "global settings"
+ * to all services, like the application id and the thread pool.
+ * @since 3.0
+ */
+public class Environment {
+
+ /** Global application id. */
+ public static String APPLICATION_ID;
+
+ /** Global thread pool. */
+ public static volatile ThreadPool THREAD_POOL;
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/support/Environment.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import java.util.Map;
+
+import org.osgi.service.event.Event;
+
+
+/**
+ * The job manager is the heart of the job event handling.
+ * It can be used to manage and monitor the queues.
+ * @since 3.0
+ */
+public interface JobManager {
+
+ /**
+ * Return statistics information about all queues.
+ */
+ Statistics getStatistics();
+
+ /**
+ * Return a queue with a specific name (if running)
+ * @param name The queue name
+ * @return The queue or <code>null</code>
+ */
+ Queue getQueue(String name);
+
+ /**
+ * Return an iterator for all available queues.
+ */
+ Iterable<Queue> getQueues();
+
+ /**
+ * The requested job types for the query.
+ * This can either be all jobs, all activated (started) or all queued jobs.
+ */
+ enum QueryType {
+ ALL,
+ ACTIVE,
+ QUEUED
+ }
+
+ /**
+ * Return all jobs either running or scheduled.
+ *
+ * @param type Required parameter for the type: either all jobs, only queued or only started can be returned.
+ * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
+ * @param templates A list of filter property maps. Each map acts like a template. The searched job
+ * must match the template (AND query). By providing several maps, different filters
+ * are possible (OR query).
+ * @return A non null collection.
+ */
+ JobsIterator queryJobs(QueryType type, String topic, Map<String, Object>... templates);
+
+ /**
+ * Find a job - either scheduled or active.
+ * This method searches for an event with the given topic and filter properties. If more than one
+ * job matches, the first one found is returned which could be any of the matching jobs.
+ *
+ * @param topic Topic is required.
+ * @param template The map acts like a template. The searched job
+ * must match the template (AND query).
+ * @return An event or <code>null</code>
+ */
+ Event findJob(String topic, Map<String, Object> template);
+
+ /**
+ * Cancel this job.
+ * Cancelling a job might fail if the job is currently in processing.
+ * @param jobId The unique identifer as found in the property {@link JobUtil#JOB_ID}.
+ * @return <code>true</code> if the job could be cancelled or does not exist anymore.
+ * <code>false</code> otherwise.
+ */
+ boolean removeJob(String jobId);
+
+ /**
+ * Cancel this job.
+ * This method acts like {@link #removeJob(String)} with the exception that it waits
+ * for a job to finish. The job will be removed when this method returns - however
+ * this method blocks until the job is finished!
+ * @param jobId The unique identifer as found in the property {@link JobUtil#JOB_ID}.
+ */
+ void forceRemoveJob(String jobId);
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import org.osgi.service.event.Event;
+
+/**
+ * A job processor processes a job in the background.
+ * It is used by {@link JobUtil#processJob(Event, JobProcessor)}.
+ * @since 3.0
+ */
+public interface JobProcessor {
+
+ /**
+ * Execute the job.
+ * If the job fails with a thrown exception/throwable, the process will not be rescheduled.
+ *
+ * @param job The event containing the job description.
+ * @return True if the job could be finished (either successful or by an error).
+ * Return false if the job should be rescheduled.
+ */
+ boolean process(Event job);
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.impl.support.Environment;
+import org.osgi.service.event.Event;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The <code>Job</code> class is an utility class for
+ * creating and processing jobs.
+ * @since 3.0
+ */
+public abstract class JobUtil {
+
+ /** The job topic property. */
+ public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
+
+ /** The property for the unique event name. Value is of type String (This is optional). */
+ public static final String PROPERTY_JOB_NAME = "event.job.id";
+
+ /** The property to set if a job can be run parallel to any other job.
+ * The following values are supported:
+ * - boolean value <code>true</code> and <code>false</code>
+ * - string value <code>true</code> and <code>false</code>
+ * - integer value higher than 1 - if this is specified jobs are run in
+ * parallel but never more than the specified number.
+ *
+ * We might want to use different values in the future for enhanced
+ * parallel job handling.
+ *
+ * This value is only used, if {@link JobUtil#PROPERTY_JOB_QUEUE_NAME} is
+ * specified and the referenced queue is not started yet.
+ */
+ public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel";
+
+ /** The property to set if a job should only be run on the same app it has been created. */
+ public static final String PROPERTY_JOB_RUN_LOCAL = "event.job.run.local";
+
+ /** The property to track the retry count for jobs. Value is of type Integer. */
+ public static final String PROPERTY_JOB_RETRY_COUNT = "event.job.retrycount";
+
+ /** The property for setting the maximum number of retries. Value is of type Integer. */
+ public static final String PROPERTY_JOB_RETRIES = "event.job.retries";
+
+ /** The property to set a retry delay. Value is of type Long and specifies milliseconds. */
+ public static final String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
+
+ /** The property to set to put the jobs into a separate job queue. This property
+ * specifies the name of the job queue. If the job queue does not exists yet
+ * a new queue is created.
+ * If a ordered job queue is used, the jobs are never executed in parallel
+ * from this queue! For non ordered queues the {@link #PROPERTY_JOB_PARALLEL}
+ * with an integer value higher than 1 can be used to specify the maximum number
+ * of parallel jobs for this queue.
+ */
+ public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename";
+
+ /** If this property is set with any value, the queue processes the jobs in the same
+ * order as they have arrived.
+ * This property has only an effect if {@link #PROPERTY_JOB_QUEUE_NAME} is specified
+ * and the job queue has not been started yet.
+ */
+ public static final String PROPERTY_JOB_QUEUE_ORDERED = "event.job.queueordered";
+
+ /** This property allows to override the priority for the thread used to start this job.
+ * The property is evaluated by the {@link #processJob(Event, JobProcessor)} method.
+ * If another way of executing the job is used, it is up to the client to ensure
+ * the job priority.
+ * For possible values see {@link JobPriority}.
+ */
+ public static final String PROPERTY_JOB_PRIORITY = "event.job.priority";
+
+ /**
+ * The priority for jobs.
+ */
+ public enum JobPriority {
+ NORM,
+ MIN,
+ MAX
+ }
+
+ /** The topic for jobs. */
+ public static final String TOPIC_JOB = "org/apache/sling/event/job";
+
+ /**
+ * This is a unique identifer which can be used to cancel the job.
+ */
+ public static final String JOB_ID = "slingevent:eventId";
+
+ /**
+ * Notification events for jobs.
+ */
+
+ /** Asynchronous notification event when a job is started.
+ * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+ * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+ * timestamp of the event (as a Long).
+ */
+ public static final String TOPIC_JOB_STARTED = "org/apache/sling/event/notification/job/START";
+
+ /** Asynchronous notification event when a job is finished.
+ * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+ * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+ * timestamp of the event (as a Long).
+ */
+ public static final String TOPIC_JOB_FINISHED = "org/apache/sling/event/notification/job/FINISHED";
+
+ /** Asynchronous notification event when a job failed.
+ * If a job execution fails, it is rescheduled for another try.
+ * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+ * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+ * timestamp of the event (as a Long).
+ */
+ public static final String TOPIC_JOB_FAILED = "org/apache/sling/event/notification/job/FAILED";
+
+ /** Asynchronous notification event when a job is cancelled.
+ * If a job execution is cancelled it is not rescheduled.
+ * The property {@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the
+ * property {@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the
+ * timestamp of the event (as a Long).
+ */
+ public static final String TOPIC_JOB_CANCELLED = "org/apache/sling/event/notification/job/CANCELLED";
+
+ /** Property containing the job event. */
+ public static final String PROPERTY_NOTIFICATION_JOB = "event.notification.job";
+
+ /**
+ * Is this a job event?
+ * This method checks if the event contains the {@link #PROPERTY_JOB_TOPIC}
+ * property.
+ * @param event The event to check.
+ * @return <code>true></code> if this is a job event.
+ */
+ public static boolean isJobEvent(final Event event) {
+ return event.getProperty(PROPERTY_JOB_TOPIC) != null;
+ }
+
+ /**
+ * Check if this a job event and return the notifier context.
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ private static JobStatusNotifier.NotifierContext getNotifierContext(final Event job) {
+ // check if this is a job event
+ if ( !isJobEvent(job) ) {
+ return null;
+ }
+ final JobStatusNotifier.NotifierContext ctx = (JobStatusNotifier.NotifierContext) job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
+ if ( ctx == null ) {
+ throw new IllegalArgumentException("JobStatusNotifier context is not available in event properties.");
+ }
+ return ctx;
+ }
+
+ /**
+ * Send an acknowledge.
+ * This signals the job handler that someone is starting to process the job. This method
+ * should be invoked as a first command during job processing.
+ * If this method returns <code>false</code> this means someone else is already
+ * processing this job, and the caller should not process the event anymore.
+ * @return Returns <code>true</code> if the acknowledge could be sent
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ public static boolean acknowledgeJob(final Event job) {
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ if ( !ctx.notifier.sendAcknowledge(job) ) {
+ // if we don't get an ack, someone else is already processing this job.
+ // we process but do not notify the job event handler.
+ LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Notify a finished job.
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ public static void finishedJob(final Event job) {
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ ctx.notifier.finishedJob(job, false);
+ }
+ }
+
+ /**
+ * Notify a failed job.
+ * @return <code>true</code> if the job has been rescheduled, <code>false</code> otherwise.
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ public static boolean rescheduleJob(final Event job) {
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ return ctx.notifier.finishedJob(job, true);
+ }
+ return false;
+ }
+
+ /**
+ * Process a job in the background and notify its success.
+ * This method also sends an acknowledge message to the job event handler.
+ * @throws IllegalArgumentException If the event is a job event but does not have a notifier context.
+ */
+ public static void processJob(final Event job, final JobProcessor processor) {
+ // first check for a notifier context to send an acknowledge
+ boolean notify = true;
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ if ( !ctx.notifier.sendAcknowledge(job) ) {
+ // if we don't get an ack, someone else is already processing this job.
+ // we process but do not notify the job event handler.
+ LoggerFactory.getLogger(JobUtil.class).info("Someone else is already processing job {}.", job);
+ notify = false;
+ }
+ }
+ final JobPriority priority = (JobPriority) job.getProperty(PROPERTY_JOB_PRIORITY);
+ final boolean notifyResult = notify;
+
+ final Runnable task = new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ final Thread currentThread = Thread.currentThread();
+ // update priority and name
+ final String oldName = currentThread.getName();
+ final int oldPriority = currentThread.getPriority();
+
+ currentThread.setName(oldName + "-" + job.getProperty(PROPERTY_JOB_QUEUE_NAME) + "(" + job.getProperty(PROPERTY_JOB_TOPIC) + ")");
+ if ( priority != null ) {
+ switch ( priority ) {
+ case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
+ break;
+ case MIN : currentThread.setPriority(Thread.MIN_PRIORITY);
+ break;
+ case MAX : currentThread.setPriority(Thread.MAX_PRIORITY);
+ break;
+ }
+ }
+ boolean result = false;
+ try {
+ result = processor.process(job);
+ } catch (Throwable t) { //NOSONAR
+ LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
+ // we don't reschedule if an exception occurs
+ result = true;
+ } finally {
+ currentThread.setPriority(oldPriority);
+ currentThread.setName(oldName);
+ if ( notifyResult ) {
+ if ( result ) {
+ JobUtil.finishedJob(job);
+ } else {
+ JobUtil.rescheduleJob(job);
+ }
+ }
+ }
+ }
+
+ };
+ // check if the thread pool is available
+ final ThreadPool pool = Environment.THREAD_POOL;
+ if ( pool != null ) {
+ pool.execute(task);
+ } else {
+ // if we don't have a thread pool, we create the thread directly
+ // (this should never happen for jobs, but is a safe fallback and
+ // allows to call this method for other background processing.
+ new Thread(task).start();
+ }
+ }
+
+ private JobUtil() {
+ // avoid instantiation
+ }
+}
\ No newline at end of file
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobUtil.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+import java.util.Iterator;
+
+import org.osgi.service.event.Event;
+
+/**
+ * This <code>Iterator</code> allows to iterate over {@link Event}s.
+ * In addition to an iterator it might return the number of elements
+ * in the collection and allows to skip several elements.
+ * @since 3.0
+ */
+public interface JobsIterator extends Iterator<Event>, Iterable<Event> {
+
+ /**
+ * Skip a number of jobs.
+ * @param skipNum the non-negative number of elements to skip
+ * @throws java.util.NoSuchElementException
+ * if skipped past the last job in the iterator.
+ */
+ void skip(long skipNum);
+
+ /**
+ * Returns the total number of jobs. In some cases a precise information
+ * is not available. In these cases -1 is returned.
+ */
+ long getSize();
+
+ /**
+ * Returns the current position within the iterator. The number returned is
+ * the 0-based index of the next job.
+ */
+ long getPosition();
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+
+/**
+ * This is a job queue processing job events.
+ * @since 3.0
+ */
+public interface Queue {
+
+ /**
+ * Get the queue name.
+ */
+ String getName();
+
+ /**
+ * Return statistics information about this queue.
+ */
+ Statistics getStatistics();
+
+ /**
+ * Return some information about the current status of the queue.
+ */
+ String getStatusInfo();
+
+ /**
+ * Get the corresponding configuration.
+ */
+ QueueConfiguration getConfiguration();
+
+ /**
+ * Suspend the queue - when a queue is suspended it stops processing
+ * jobs - however already started jobs are finished (but not rescheduled).
+ * Depending on the queue implementation, the queue is only suspended
+ * for a specific time.
+ * A queue can be resumed with {@link #resume()}.
+ */
+ void suspend();
+
+ /**
+ * Resume a suspended queue. {@link #suspend()}. If the queue is not
+ * suspended, calling this method has no effect.
+ * Depending on the queue implementation, if a job failed a job queue might
+ * sleep for a configured time, before a new job is processed. By calling this
+ * method, the job queue can be woken up and force an immediate reprocessing.
+ * This feature is only supported by ordered queues at the moment. If a queue
+ * does not support this feature, calling this method has only an effect if
+ * the queue is really supsended.
+ */
+ void resume();
+
+ /**
+ * Is the queue currently suspended?
+ */
+ boolean isSuspended();
+
+ /**
+ * Remove all outstanding jobs from the queue. This does not delete
+ * the jobs. The jobs are either processed by a different cluster node
+ * or on restart.
+ */
+ void clear();
+
+ /**
+ * Remove all outstanding jobs and delete them. This actually cancels
+ * all outstanding jobs (but no notifications are send).
+ */
+ void removeAll();
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Queue.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+
+/**
+ * The configuration of a queue.
+ * @since 3.0
+ */
+public interface QueueConfiguration {
+
+ /** The queue type. */
+ static enum Type {
+ UNORDERED,
+ ORDERED,
+ TOPIC_ROUND_ROBIN,
+ IGNORE
+ }
+
+ /**
+ * Return the retry delay in ms
+ */
+ long getRetryDelayInMs();
+
+ /**
+ * Return the max number of retries, -1 for endless retry!
+ */
+ int getMaxRetries();
+
+ /**
+ * Return the queue type.
+ */
+ Type getType();
+
+ /**
+ * Return the thread priority for the job thread.
+ */
+ JobUtil.JobPriority getPriority();
+
+ /**
+ * Return the max number of parallel processes.
+ */
+ int getMaxParallel();
+
+ /**
+ * Is this a local running queue (= processing only
+ * jobs started on the same instance.)
+ */
+ boolean isLocalQueue();
+
+ /**
+ * Application ids - returns an array of application
+ * ids if this queue is bound to some cluster nodes.
+ */
+ String[] getApplicationIds();
+
+ /**
+ * The list of topics this queue is bound to.
+ */
+ String[] getTopics();
+
+ /**
+ * Get the ranking of this configuration.
+ */
+ int getRanking();
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.jobs;
+
+/**
+ * Statistic information.
+ * This information is not preserved between restarts of the service.
+ * Once a service is restarted, the counters start at zero!
+ * @since 3.0
+ */
+public interface Statistics {
+
+ /**
+ * The time this service has been started
+ */
+ long getStartTime();
+
+ /**
+ * Number of successfully finished jobs.
+ */
+ long getNumberOfFinishedJobs();
+
+ /**
+ * Number of permanently failing or cancelled jobs.
+ */
+ long getNumberOfCancelledJobs();
+
+ /**
+ * Number of failing jobs.
+ */
+ long getNumberOfFailedJobs();
+
+ /**
+ * Number of already processed jobs. This adds
+ * {@link #getNumberOfFinishedJobs()}, {@link #getNumberOfCancelledJobs()}
+ * and {@link #getNumberOfFailedJobs()}
+ */
+ long getNumberOfProcessedJobs();
+
+ /**
+ * Number of jobs currently in processing.
+ */
+ long getNumberOfActiveJobs();
+
+ /**
+ * Number of jobs currently waiting in a queue.
+ */
+ long getNumberOfQueuedJobs();
+
+ /**
+ * This just adds {@link #getNumberOfActiveJobs()} and {@link #getNumberOfQueuedJobs()}
+ */
+ long getNumberOfJobs();
+
+ /**
+ * The time a job has been started last.
+ */
+ long getLastActivatedJobTime();
+
+ /**
+ * The time a job has been finished/failed/cancelled last.
+ */
+ long getLastFinishedJobTime();
+
+ /**
+ * The average waiting time of a job in the queue.
+ */
+ long getAverageWaitingTime();
+
+ /**
+ * The average processing time of a job - this only counts finished jobs.
+ */
+ long getAverageProcessingTime();
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/jobs/Statistics.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties (added)
+++ sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties Mon Oct 11 06:54:12 2010
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# This file contains localization strings for configuration labels and
+# descriptions as used in the metatype.xml descriptor generated by the
+# the SCR plugin
+
+#
+# Distributing Event Handler
+dist.events.name = Apache Sling Distributing Event Handler
+dist.events.description = Distributes local OSGi Event Admin events to \
+ other nodes of the same cluster. The events are written to the JCR \
+ repository for distribution to other nodes while events written to the \
+ repository are picked up and distributed locally through the OSGi Event Admin \
+ Service.
+
+scheduler.period.name = Cleanup Internal
+scheduler.period.description = Interval in seconds in which events older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 30 minutes (1800 seconds).
+
+cleanup.period.name = Event Cleanup Age
+cleanup.period.description = The maximum age in minutes of persisted events to \
+ be purged from the repository during the cleanup run. The default is 15 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository.
+
+
+#
+# Queue Configuration and Job Event Handler
+queue.name = Apache Sling Job Queue Configuration
+queue.description = The configuration of a job processing queue.
+
+queue.name.name = Name
+queue.name.description = The name of the queue. If matching is used \
+ the token \{0\} can be used to substitute the real value.
+
+queue.type.name = Type
+queue.type.description = The queue type.
+
+queue.topics.name = Topics
+queue.topics.description = This value is required and lists the topics processed by \
+ this queue. The value is a list of strings. If a string ends with a dot, \
+ all topics in exactly this package match. If the string ends with a star, \
+ all topics in this package and all subpackages match. If the string neither \
+ ends with a dot nor with a star, this is assumed to define an exact topic.
+
+queue.priority.name = Priority
+queue.priority.description = The priority for the threads from this queue. Default is norm.
+
+queue.retries.name = Maximum Retries
+queue.retries.description = The maximum number of times a failed job slated \
+ for retries is actually retried. If a job has been retried this number of \
+ times and still fails, it is not rescheduled and assumed to have failed. The \
+ default value is 10.
+
+queue.retrydelay.name = Retry Delay
+queue.retrydelay.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 2 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+
+queue.maxparallel.name = Maximum Parallel Jobs
+queue.maxparallel.description = The maximum number of parallel jobs started for this queue. \
+ A value of -1 is substituted with the number of available processors.
+
+queue.runlocal.name = Run Local
+queue.runlocal.description = Jobs for this queue are only processed on the cluster node \
+ where the job has been started.
+
+queue.applicationids.name = Application Ids
+queue.applicationids.description = An optional list of application ids. If configured, \
+ jobs for this queue are only processed on those cluster nodes.
+
+
+#
+# Job Event Handler
+job.events.name = Apache Sling Job Event Handler
+job.events.description = Manages job scheduling on a single system as well \
+ as on a cluster. A Job runs only on a single cluster node. \
+ The respective scheduling is persisted in the repository and distributed \
+ amongst the cluster nodes through repository events. The jobs are started \
+ locally on a single cluster node through the OSGi Event Admin.
+
+jobscheduler.period.name = Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which unused \
+ queues are stopped. The default value is 5 minutes (300 seconds).
+
+
+#
+# Persistence Handler
+job.persistence.name = Apache Sling Job Persistence Manager
+job.persistence.description = This service persists and loads jobs from the repository.
+
+persscheduler.period.name = Event Cleanup Internal
+persscheduler.period.description = Interval in seconds in which jobs older than \
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
+sleep.time.name = Retry Interval
+sleep.time.description = The number of milliseconds to sleep between two \
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 30 seconds. This value is only relevant if there is a single \
+ failed job in the queue. If there are multiple failed jobs, each job is \
+ retried in turn without an intervening delay.
+
+jobcleanup.period.name = Event Cleanup Age
+jobcleanup.period.description = The maximum age in minutes of persisted job to \
+ be purged from the repository during the cleanup run. The default is 5 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository.
+
+max.load.jobs.name = Max Load Jobs
+max.load.jobs.description = The maximum amount of jobs being loaded from the repository on startup. \
+ Default is 1000 jobs.
+
+load.threshold.name = Load Threshold
+load.threshold.description = If the queue is lower than this threshold the repository is checked \
+ for events. The default value is 400. This works together with the maximum load jobs.
+
+load.delay.name = Background Load Delay
+load.delay.description = The background loader waits this time of seconds after startup before \
+ loading events from the repository. Default value is 30 seconds.
+
+load.checkdelay.name = Background Check Delay
+load.checkdelay.description = The background loader sleeps this time of seconds before \
+ checking the repository for jobs. Default value is 240 seconds.
+
+#
+# Event Pool
+event.pool.name = Apache Sling Event Thread Pool
+event.pool.description = This is the thread pool used by the Apache Sling eventing support.
+
+minPoolSize.name = Min Pool Size
+minPoolSize.description = The minimum pool size. The minimum pool size should be \
+ higher than 20. Approx 10 threads are in use by the system, so a pool size of 20 \
+ allows to process 10 events in parallel.
+
+maxPoolSize.name = Max Pool Size
+maxPoolSize.description = The maximum pool size. The maximum pool size should be higher than \
+ or equal to the minimum pool size.
+
+priority.name = Priority
+priority.description = The priority for the threads from this pool. Default is norm.
Propchange: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
------------------------------------------------------------------------------
svn:keywords = Id
Propchange: sling/branches/eventing-3.0/src/main/resources/OSGI-INF/metatype/metatype.properties
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd (added)
+++ sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd Mon Oct 11 06:54:12 2010
@@ -0,0 +1,42 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+<slingevent='http://sling.apache.org/jcr/event/1.0'>
+<nt='http://www.jcp.org/jcr/nt/1.0'>
+<mix='http://www.jcp.org/jcr/mix/1.0'>
+
+[slingevent:Event] > nt:unstructured, nt:hierarchyNode
+ - slingevent:topic (string)
+ - slingevent:application (string)
+ - slingevent:created (date)
+ - slingevent:properties (binary)
+
+[slingevent:Job] > slingevent:Event, mix:lockable
+ - slingevent:processor (string)
+ - slingevent:id (string)
+ - slingevent:finished (date)
+
+[slingevent:TimedEvent] > slingevent:Event, mix:lockable
+ - slingevent:processor (string)
+ - slingevent:id (string)
+ - slingevent:expression (string)
+ - slingevent:date (date)
+ - slingevent:period (long)
+
+
Propchange: sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/resources/SLING-INF/nodetypes/event.cnd
------------------------------------------------------------------------------
svn:keywords = Id
Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Properties;
+
+import javax.jcr.PropertyType;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.sling.event.impl.jobs.jcr.JCRHelper;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+
+/**
+ * Tests for the EventUtil utility methods.
+ */
+@RunWith(JMock.class)
+public class EventUtilTest {
+
+ protected Mockery context;
+
+ public EventUtilTest() {
+ this.context = new JUnit4Mockery();
+ }
+
+ @Test public void testDistributeFlag() {
+ final Event distributableEvent = EventUtil.createDistributableEvent("some/topic", null);
+ assertTrue(EventUtil.shouldDistribute(distributableEvent));
+ final Event nonDistributableEvent = new Event("another/topic", (Dictionary<String, Object>)null);
+ assertFalse(EventUtil.shouldDistribute(nonDistributableEvent));
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("a", "a");
+ props.put("b", "b");
+ final Event distributableEvent2 = EventUtil.createDistributableEvent("some/topic", props);
+ assertTrue(EventUtil.shouldDistribute(distributableEvent2));
+ // we should have four properties: 2 custom, one for the dist flag and the fourth for the topic
+ assertEquals(4, distributableEvent2.getPropertyNames().length);
+ assertEquals("a", distributableEvent2.getProperty("a"));
+ assertEquals("b", distributableEvent2.getProperty("b"));
+ }
+
+ @Test public void testLocalFlag() {
+ final Event localEvent = new Event("local/event", (Dictionary<String, Object>)null);
+ assertTrue(EventUtil.isLocal(localEvent));
+ final Properties props = new Properties();
+ props.put(EventUtil.PROPERTY_APPLICATION, "application1");
+ final Event remoteEvent = new Event("remote/event", (Dictionary<Object, Object>)props);
+ assertFalse(EventUtil.isLocal(remoteEvent));
+ }
+
+ protected Value getValueOfType(final int type, String name) {
+ final Value v = this.context.mock(Value.class, name);
+ this.context.checking(new Expectations() {{
+ allowing(v).getType();will(returnValue(type));
+ }});
+ return v;
+ }
+
+ @Test public void testGetNodePropertyValue() {
+ final ValueFactory factory = this.context.mock(ValueFactory.class);
+ this.context.checking(new Expectations() {{
+ allowing(factory).createValue(true);
+ will(returnValue(getValueOfType(PropertyType.BOOLEAN, "booleanValue1")));
+ allowing(factory).createValue(false);
+ will(returnValue(getValueOfType(PropertyType.BOOLEAN, "booleanValue2")));
+ allowing(factory).createValue(with(any(Long.class)));
+ will(returnValue(getValueOfType(PropertyType.LONG, "longValue")));
+ allowing(factory).createValue(with(any(String.class)));
+ will(returnValue(getValueOfType(PropertyType.STRING, "stringValue")));
+ allowing(factory).createValue(with(any(Calendar.class)));
+ will(returnValue(getValueOfType(PropertyType.DATE, "dateValue")));
+ }});
+ // boolean
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, true).getType());
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, false).getType());
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.TRUE).getType());
+ assertEquals(PropertyType.BOOLEAN, JCRHelper.getNodePropertyValue(factory, Boolean.FALSE).getType());
+ // long
+ assertEquals(PropertyType.LONG, JCRHelper.getNodePropertyValue(factory, (long)5).getType());
+ // int = not possible
+ assertEquals(null, JCRHelper.getNodePropertyValue(factory, 5));
+ // string
+ assertEquals(PropertyType.STRING, JCRHelper.getNodePropertyValue(factory, "something").getType());
+ // calendar
+ assertEquals(PropertyType.DATE, JCRHelper.getNodePropertyValue(factory, Calendar.getInstance()).getType());
+ }
+}
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/EventUtilTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.Dictionary;
+
+import org.jmock.Expectations;
+import org.jmock.integration.junit4.JMock;
+import org.junit.runner.RunWith;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.EventAdmin;
+
+@RunWith(JMock.class)
+public abstract class AbstractRepositoryEventHandlerTest extends AbstractTest {
+
+ protected volatile AbstractRepositoryEventHandler handler;
+
+ protected abstract AbstractRepositoryEventHandler createHandler();
+
+ protected void activate(final EventAdmin ea) throws Throwable {
+ super.activate(ea);
+ this.handler = this.createHandler();
+
+ handler.environment = this.environment;
+
+ // lets set up the bundle context
+ final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "beforeBundleContext" + activateCount);
+
+ // lets set up the component configuration
+ final Dictionary<String, Object> componentConfig = this.getComponentConfig();
+
+ // lets set up the compnent context
+ final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "beforeComponentContext" + activateCount);
+ this.getMockery().checking(new Expectations() {{
+ allowing(componentContext).getBundleContext();
+ will(returnValue(bundleContext));
+ allowing(componentContext).getProperties();
+ will(returnValue(componentConfig));
+ }});
+
+ this.handler.activate(componentContext);
+
+ // the session is initialized in the background, so let's sleep some seconds
+ try {
+ Thread.sleep(2 * 1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ protected void deactivate() throws Throwable {
+ // lets set up the bundle context with the sling id
+ final BundleContext bundleContext = this.getMockery().mock(BundleContext.class, "afterBundleContext" + activateCount);
+
+ final ComponentContext componentContext = this.getMockery().mock(ComponentContext.class, "afterComponentContext" + activateCount);
+ this.getMockery().checking(new Expectations() {{
+ allowing(componentContext).getBundleContext();
+ will(returnValue(bundleContext));
+ }});
+ this.handler.deactivate(componentContext);
+ this.handler = null;
+ super.deactivate();
+ }
+}
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.Set;
+
+import javax.jcr.Node;
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+
+import junitx.util.PrivateAccessor;
+
+import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
+import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolConfig;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.apache.sling.settings.SlingSettingsService;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.junit.runner.RunWith;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+@RunWith(JMock.class)
+public abstract class AbstractTest {
+
+ protected static final String REPO_PATH = "/test/events";
+ protected static final String SLING_ID = "4711";
+
+ protected static Session session;
+
+ protected abstract Mockery getMockery();
+
+ protected EnvironmentComponent environment;
+
+ protected Hashtable<String, Object> getComponentConfig() {
+ final Hashtable<String, Object> config = new Hashtable<String, Object>();
+ config.put(AbstractRepositoryEventHandler.CONFIG_PROPERTY_REPO_PATH, REPO_PATH);
+
+ return config;
+ }
+
+ @org.junit.BeforeClass public static void setupRepository() throws Exception {
+ RepositoryTestUtil.startRepository();
+ final SlingRepository repository = RepositoryTestUtil.getSlingRepository();
+ session = repository.loginAdministrative(repository.getDefaultWorkspace());
+ assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/event.cnd")));
+ assertTrue(RepositoryTestUtil.registerNodeType(session, DistributingEventHandler.class.getResourceAsStream("/SLING-INF/nodetypes/folder.cnd")));
+ if ( session.itemExists(REPO_PATH) ) {
+ session.getItem(REPO_PATH).remove();
+ session.save();
+ }
+ }
+
+ @org.junit.AfterClass public static void shutdownRepository() throws Exception {
+ if ( session != null ) {
+ session.logout();
+ session = null;
+ }
+ RepositoryTestUtil.stopRepository();
+ }
+
+ @org.junit.Before public void setup() throws Throwable {
+ // activate
+ this.activate(null);
+ }
+
+ protected int activateCount = 1;
+
+ protected void activate(final EventAdmin ea) throws Throwable {
+ this.environment = new EnvironmentComponent();
+ PrivateAccessor.setField(this.environment, "repository", RepositoryTestUtil.getSlingRepository());
+ PrivateAccessor.setField(this.environment, "classLoaderManager", new DynamicClassLoaderManager() {
+
+ public ClassLoader getDynamicClassLoader() {
+ return this.getClass().getClassLoader();
+ }
+ });
+
+ // the event admin
+ if ( ea != null ) {
+ PrivateAccessor.setField(this.environment, "eventAdmin", ea);
+ } else {
+ final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class, "eventAdmin" + activateCount);
+ PrivateAccessor.setField(this.environment, "eventAdmin", eventAdmin);
+ this.getMockery().checking(new Expectations() {{
+ allowing(eventAdmin).postEvent(with(any(Event.class)));
+ allowing(eventAdmin).sendEvent(with(any(Event.class)));
+ }});
+ }
+ // sling settings service
+ PrivateAccessor.setField(this.environment, "settingsService", new SlingSettingsService() {
+ public String getSlingId() {
+ return SLING_ID;
+ }
+
+ public URL getSlingHome() {
+ return null;
+ }
+
+ public String getSlingHomePath() {
+ return null;
+ }
+
+ public Set<String> getRunModes() {
+ return Collections.<String> emptySet();
+ }
+ });
+
+ // we need a thread pool
+ PrivateAccessor.setField(this.environment, "threadPool", new ThreadPoolImpl());
+ this.environment.activate();
+ }
+
+ protected void deactivate() throws Throwable {
+ this.environment.deactivate();
+ this.environment = null;
+ activateCount++;
+ }
+
+ protected void setEventAdmin(final EventAdmin ea) throws Exception {
+ PrivateAccessor.setField(this.environment, "eventAdmin", ea);
+ }
+
+ @org.junit.After public void shutdown() throws Throwable {
+ this.deactivate();
+ try {
+ // delete all child nodes to get a clean repository again
+ final Node rootNode = (Node) session.getItem(REPO_PATH);
+ final NodeIterator iter = rootNode.getNodes();
+ while ( iter.hasNext() ) {
+ final Node child = iter.nextNode();
+ child.remove();
+ }
+ session.save();
+ } catch ( RepositoryException re) {
+ // we ignore this for the test
+ }
+ }
+
+ @org.junit.Test public void testPathCreation() throws RepositoryException {
+ assertTrue(session.itemExists(REPO_PATH));
+ }
+
+ final class ThreadPoolImpl implements ThreadPool {
+
+ public void execute(Runnable runnable) {
+ final Thread t = new Thread(runnable);
+ t.start();
+ }
+
+ public String getName() {
+ return "default";
+ }
+
+ public ThreadPoolConfig getConfiguration() {
+ return new ModifiableThreadPoolConfig();
+ }
+
+ }
+}
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/AbstractTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java (added)
+++ sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Simplified version of the cyclic barrier class for testing. */
+public class Barrier extends CyclicBarrier {
+
+ public Barrier(int parties) {
+ super(parties);
+ }
+
+ public void block() {
+ try {
+ this.await();
+ } catch (InterruptedException e) {
+ // ignore
+ } catch (BrokenBarrierException e) {
+ // ignore
+ }
+ }
+
+ public boolean block(int seconds) {
+ try {
+ this.await(seconds, TimeUnit.SECONDS);
+ return true;
+ } catch (InterruptedException e) {
+ // ignore
+ } catch (BrokenBarrierException e) {
+ // ignore
+ } catch (TimeoutException e) {
+ // ignore
+ }
+ this.reset();
+ return false;
+ }
+}
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/test/java/org/apache/sling/event/impl/Barrier.java
------------------------------------------------------------------------------
svn:mime-type = text/plain