You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/11 08:54:14 UTC

svn commit: r1021247 [3/6] - in /sling/branches/eventing-3.0: ./ .settings/ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/...

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.osgi.service.event.Event;
+
+public interface JobStatusNotifier {
+
+    String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
+
+    class NotifierContext {
+        public final JobStatusNotifier notifier;
+
+        public NotifierContext(JobStatusNotifier n) {
+            this.notifier = n;
+        }
+    }
+
+    /**
+     * Send an acknowledge message that someone is processing the job.
+     * @param job The job.
+     * @return <code>true</code> if the ack is ok, <code>false</code> otherwise (e.g. if
+     *   someone else already send an ack for this job.
+     */
+    boolean sendAcknowledge(Event job);
+
+    /**
+     * Notify that the job is finished.
+     * If the job is not rescheduled, a return value of <code>false</code> indicates an error
+     * during the processing. If the job should be rescheduled, <code>true</code> indicates
+     * that the job could be rescheduled. If an error occurs or the number of retries is
+     * exceeded, <code>false</code> will be returned.
+     * @param job The job.
+     * @param reschedule Should the event be rescheduled?
+     * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
+     */
+    boolean finishedJob(Event job, boolean reschedule);
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.sling.event.jobs.JobsIterator;
+import org.osgi.service.event.Event;
+
+/**
+ * Jobs iterator.
+ */
+public class JobsIteratorImpl implements JobsIterator {
+
+    /** The events list size. */
+    private final long size;
+
+    /** The current position. */
+    private int index = 0;
+
+    /** The iterator. */
+    private final Iterator<Event> iter;
+
+    public JobsIteratorImpl(final List<Event> events) {
+        this.size = events.size();
+        this.iter = events.iterator();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobsIterator#getPosition()
+     */
+    public long getPosition() {
+        return this.index;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobsIterator#getSize()
+     */
+    public long getSize() {
+        return this.size;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.JobsIterator#skip(long)
+     */
+    public void skip(long skipNum) {
+        if ( skipNum < 0 ) {
+            throw new IllegalArgumentException();
+        }
+        if ( this.index + skipNum >= this.size ) {
+            throw new NoSuchElementException();
+        }
+        for(long i=0; i<skipNum; i++) {
+            this.iter.next();
+        }
+        this.index += skipNum;
+    }
+
+    /**
+     * @see java.util.Iterator#hasNext()
+     */
+    public boolean hasNext() {
+        return this.iter.hasNext();
+    }
+
+    /**
+     * @see java.util.Iterator#next()
+     */
+    public Event next() {
+        return this.iter.next();
+    }
+
+    /**
+     * @see java.util.Iterator#remove()
+     */
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @see java.lang.Iterable#iterator()
+     */
+    public Iterator<Event> iterator() {
+        return this;
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.apache.sling.event.jobs.Statistics;
+
+/**
+ * Implementation of the statistics.
+ */
+public class StatisticsImpl implements Statistics {
+
+    private final long startTime;
+
+    private volatile long activeJobs;
+
+    private volatile long queuedJobs;
+
+    private volatile long lastActivated = -1;
+
+    private volatile long lastFinished = -1;
+
+    private volatile long averageWaitingTime;
+
+    private volatile long averageProcessingTime;
+
+    private volatile long waitingTime;
+
+    private volatile long processingTime;
+
+    private volatile long waitingCount;
+
+    private volatile long processingCount;
+
+    private volatile long finishedJobs;
+
+    private volatile long failedJobs;
+
+    private volatile long cancelledJobs;
+
+    public StatisticsImpl() {
+        this.startTime = System.currentTimeMillis();
+    }
+
+    public StatisticsImpl(final long startTime) {
+        this.startTime = startTime;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getStartTime()
+     */
+    public long getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfProcessedJobs()
+     */
+    public synchronized long getNumberOfProcessedJobs() {
+        return getNumberOfCancelledJobs() + getNumberOfFailedJobs() + getNumberOfFinishedJobs();
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfActiveJobs()
+     */
+    public synchronized long getNumberOfActiveJobs() {
+        return activeJobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfQueuedJobs()
+     */
+    public synchronized long getNumberOfQueuedJobs() {
+        return queuedJobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfJobs()
+     */
+    public synchronized long getNumberOfJobs() {
+        return activeJobs + queuedJobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getAverageWaitingTime()
+     */
+    public synchronized long getAverageWaitingTime() {
+        return averageWaitingTime;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getAverageProcessingTime()
+     */
+    public synchronized long getAverageProcessingTime() {
+        return averageProcessingTime;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getNumberOfFinishedJobs()
+     */
+    public synchronized long getNumberOfFinishedJobs() {
+        return finishedJobs;
+    }
+
+    public synchronized long getNumberOfCancelledJobs() {
+        return cancelledJobs;
+    }
+
+    public synchronized long getNumberOfFailedJobs() {
+        return failedJobs;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getLastActivatedJobTime()
+     */
+    public synchronized long getLastActivatedJobTime() {
+        return this.lastActivated;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.Statistics#getLastFinishedJobTime()
+     */
+    public synchronized long getLastFinishedJobTime() {
+        return this.lastFinished;
+    }
+
+    /**
+     * Add a finished job
+     */
+    public synchronized void finishedJob(final long time) {
+        this.lastFinished = System.currentTimeMillis();
+        this.processingTime += time;
+        this.processingCount++;
+        this.averageProcessingTime = this.processingTime / this.processingCount;
+        this.finishedJobs++;
+        this.activeJobs--;
+    }
+
+    public synchronized void failedJob() {
+        this.failedJobs++;
+        this.activeJobs--;
+        this.queuedJobs++;
+    }
+
+    public synchronized void cancelledJob() {
+        this.cancelledJobs++;
+        this.activeJobs--;
+    }
+
+    /**
+     * New job in the qeue
+     */
+    public synchronized void incQueued() {
+        this.queuedJobs++;
+    }
+
+    /**
+     * Job not processed by us
+     */
+    public synchronized void decQueued() {
+        this.queuedJobs--;
+    }
+
+    /**
+     * Clear
+     */
+    public synchronized void clearQueued() {
+        this.queuedJobs = 0;
+    }
+
+    /**
+     * Add a job from the queue to status active
+     * @param time The time the job stayed in the queue.
+     */
+    public synchronized void addActive(final long time) {
+        this.queuedJobs--;
+        this.activeJobs++;
+        this.waitingCount++;
+        this.waitingTime += time;
+        this.averageWaitingTime = this.waitingTime / this.waitingCount;
+        this.lastActivated = System.currentTimeMillis();
+    }
+
+    public synchronized void add(final StatisticsImpl other) {
+        synchronized ( other ) {
+            this.queuedJobs += other.queuedJobs;
+
+            if ( other.lastActivated > this.lastActivated ) {
+                this.lastActivated = other.lastActivated;
+            }
+            if ( other.lastFinished > this.lastFinished ) {
+                this.lastFinished = other.lastFinished;
+            }
+            this.waitingTime += other.waitingTime;
+            this.waitingCount += other.waitingCount;
+            this.averageWaitingTime = this.waitingTime / this.waitingCount;
+            this.processingTime += other.processingTime;
+            this.processingCount += other.processingCount;
+            this.averageProcessingTime = this.processingTime / this.processingCount;
+            this.finishedJobs += other.finishedJobs;
+            this.failedJobs += other.failedJobs;
+            this.cancelledJobs += other.cancelledJobs;
+        }
+    }
+
+    public synchronized StatisticsImpl copy() {
+        final StatisticsImpl other = new StatisticsImpl();
+        other.queuedJobs = this.queuedJobs;
+        other.lastActivated = this.lastActivated;
+        other.lastFinished = this.lastFinished;
+        other.averageWaitingTime = this.averageWaitingTime;
+        other.averageProcessingTime = this.averageProcessingTime;
+        other.waitingTime = this.waitingTime;
+        other.processingTime = this.processingTime;
+        other.waitingCount = this.waitingCount;
+        other.processingCount = this.processingCount;
+        other.finishedJobs = this.finishedJobs;
+        other.failedJobs = this.failedJobs;
+        other.cancelledJobs = this.cancelledJobs;
+        return other;
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.UUID;
+
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+
+public class Utility {
+
+    /** Allowed characters for a node name */
+    private static final String ALLOWED_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+*#!ยค$%&()=[]?";
+    /** Replacement characters for unallowed characters in a node name */
+    private static final char REPLACEMENT_CHAR = '_';
+
+    /**
+     * Filter the node name for not allowed characters and replace them.
+     * @param nodeName The suggested node name.
+     * @return The filtered node name.
+     */
+    public static String filter(final String nodeName) {
+        final StringBuilder sb  = new StringBuilder();
+        char lastAdded = 0;
+
+        for(int i=0; i < nodeName.length(); i++) {
+            final char c = nodeName.charAt(i);
+            char toAdd = c;
+
+            if (ALLOWED_CHARS.indexOf(c) < 0) {
+                if (lastAdded == REPLACEMENT_CHAR) {
+                    // do not add several _ in a row
+                    continue;
+                }
+                toAdd = REPLACEMENT_CHAR;
+
+            } else if(i == 0 && Character.isDigit(c)) {
+                sb.append(REPLACEMENT_CHAR);
+            }
+
+            sb.append(toAdd);
+            lastAdded = toAdd;
+        }
+
+        if (sb.length()==0) {
+            sb.append(REPLACEMENT_CHAR);
+        }
+
+        return sb.toString();
+    }
+
+    /**
+     * used for the md5
+     */
+    public static final char[] hexTable = "0123456789abcdef".toCharArray();
+
+    /**
+     * Calculate an MD5 hash of the string given using 'utf-8' encoding.
+     *
+     * @param data the data to encode
+     * @return a hex encoded string of the md5 digested input
+     */
+    public static String md5(String data) {
+        try {
+            return digest("MD5", data.getBytes("utf-8"));
+        } catch (NoSuchAlgorithmException e) {
+            throw new InternalError("MD5 digest not available???");
+        } catch (UnsupportedEncodingException e) {
+            throw new InternalError("UTF8 digest not available???");
+        }
+    }
+
+    /**
+     * Digest the plain string using the given algorithm.
+     *
+     * @param algorithm The alogrithm for the digest. This algorithm must be
+     *                  supported by the MessageDigest class.
+     * @param data      the data to digest with the given algorithm
+     * @return The digested plain text String represented as Hex digits.
+     * @throws java.security.NoSuchAlgorithmException if the desired algorithm is not supported by
+     *                                  the MessageDigest class.
+     */
+    private static String digest(String algorithm, byte[] data)
+    throws NoSuchAlgorithmException {
+        MessageDigest md = MessageDigest.getInstance(algorithm);
+        byte[] digest = md.digest(data);
+        StringBuilder res = new StringBuilder(digest.length * 2);
+        for (int i = 0; i < digest.length; i++) {
+            byte b = digest[i];
+            res.append(hexTable[(b >> 4) & 15]);
+            res.append(hexTable[b & 15]);
+        }
+        return res.toString();
+    }
+
+
+    /**
+     * Create a unique node path (folder and name) for the job.
+     */
+    public static String getUniquePath(final String jobTopic, final String jobId) {
+        final StringBuilder sb = new StringBuilder(jobTopic.replace('/', '.'));
+        sb.append('/');
+        if ( jobId != null ) {
+            // we create an md from the job id - we use the first 6 bytes to
+            // create sub directories
+            final String md5 = md5(jobId);
+            sb.append(md5.substring(0, 2));
+            sb.append('/');
+            sb.append(md5.substring(2, 4));
+            sb.append('/');
+            sb.append(md5.substring(4, 6));
+            sb.append('/');
+            sb.append(filter(jobId));
+        } else {
+            // create a path from the uuid - we use the first 6 bytes to
+            // create sub directories
+            final String uuid = UUID.randomUUID().toString();
+            sb.append(uuid.substring(0, 2));
+            sb.append('/');
+            sb.append(uuid.substring(2, 4));
+            sb.append('/');
+            sb.append(uuid.substring(5, 7));
+            sb.append("/Job_");
+            sb.append(uuid.substring(8, 17));
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Helper method for sending the notification events.
+     */
+    public static void sendNotification(final EnvironmentComponent environment,
+            final String topic,
+            final Event job) {
+        final EventAdmin localEA = environment.getEventAdmin();
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(JobUtil.PROPERTY_NOTIFICATION_JOB, job);
+        props.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
+        localEA.postEvent(new Event(topic, props));
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+/**
+ * Constants for the queue configuration.
+ */
+public abstract class ConfigurationConstants {
+
+    public static final int NUMBER_OF_PROCESSORS = Runtime.getRuntime().availableProcessors();
+
+    public static final String DEFAULT_TYPE = "UNORDERED";
+    public static final String DEFAULT_PRIORITY = "NORM";
+    public static final boolean DEFAULT_RUN_LOCAL = false;
+    public static final int DEFAULT_RETRIES = 10;
+    public static final long DEFAULT_RETRY_DELAY = 2000;
+    public static final int DEFAULT_MAX_PARALLEL = 15;
+
+    public static final String PROP_NAME = "queue.name";
+    public static final String PROP_TYPE = "queue.type";
+    public static final String PROP_TOPICS = "queue.topics";
+    public static final String PROP_MAX_PARALLEL = "queue.maxparallel";
+    public static final String PROP_RETRIES = "queue.retries";
+    public static final String PROP_RETRY_DELAY = "queue.retrydelay";
+    public static final String PROP_PRIORITY = "queue.priority";
+    public static final String PROP_RUN_LOCAL = "queue.runlocal";
+    public static final String PROP_APP_IDS = "queue.applicationids";
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.PropertyUnbounded;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.osgi.framework.Constants;
+import org.osgi.service.event.Event;
+
+@Component(metatype=true,name="org.apache.sling.event.jobs.QueueConfiguration",
+        label="%queue.name", description="%queue.description",
+        configurationFactory=true,policy=ConfigurationPolicy.REQUIRE)
+@Service(value=InternalQueueConfiguration.class)
+@Properties({
+    @Property(name=ConfigurationConstants.PROP_NAME),
+    @Property(name=ConfigurationConstants.PROP_TYPE,
+            value=ConfigurationConstants.DEFAULT_TYPE,
+            options={@PropertyOption(name="UNORDERED",value="Parallel"),
+                     @PropertyOption(name="ORDERED",value="Ordered"),
+                     @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin"),
+                     @PropertyOption(name="IGNORE",value="Ignore")}),
+    @Property(name=ConfigurationConstants.PROP_TOPICS,
+            unbounded=PropertyUnbounded.ARRAY),
+    @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
+            intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
+    @Property(name=ConfigurationConstants.PROP_RETRIES,
+            intValue=ConfigurationConstants.DEFAULT_RETRIES),
+    @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
+            longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+    @Property(name=ConfigurationConstants.PROP_PRIORITY,
+            value=ConfigurationConstants.DEFAULT_PRIORITY,
+            options={@PropertyOption(name="NORM",value="Norm"),
+                     @PropertyOption(name="MIN",value="Min"),
+                     @PropertyOption(name="MAX",value="Max")}),
+    @Property(name=ConfigurationConstants.PROP_RUN_LOCAL,
+            boolValue=ConfigurationConstants.DEFAULT_RUN_LOCAL),
+    @Property(name=ConfigurationConstants.PROP_APP_IDS,
+            unbounded=PropertyUnbounded.ARRAY)
+})
+public class InternalQueueConfiguration
+    implements QueueConfiguration {
+
+    /** The name of the queue. */
+    private String name;
+
+    /** The queue type. */
+    private Type type;
+
+    /** Number of retries. */
+    private int retries;
+
+    /** Retry delay. */
+    private long retryDelay;
+
+    /** Local queue? */
+    private boolean runLocal;
+
+    /** Thread priority. */
+    private JobUtil.JobPriority priority;
+
+    /** The maximum number of parallel processes (for non ordered queues) */
+    private int maxParallelProcesses;
+
+    /** Optional application ids where this queue is running on. */
+    private String[] applicationIds;
+
+    /** The ordering. */
+    private int serviceRanking;
+
+    /** The matchers for topics. */
+    private Matcher[] matchers;
+
+    /** The configured topics. */
+    private String[] topics;
+
+    /** Valid flag. */
+    private boolean valid = false;
+
+    private String pid;
+
+    /**
+     * Create a new configuration from a config
+     */
+    public static InternalQueueConfiguration fromConfiguration(final Map<String, Object> params) {
+        final InternalQueueConfiguration c = new InternalQueueConfiguration();
+        c.activate(params);
+        return c;
+    }
+
+    public InternalQueueConfiguration() {
+        // nothing to do, see activate
+    }
+
+    /**
+     * Create a new queue configuration
+     */
+    @Activate
+    protected void activate(final Map<String, Object> params) {
+        this.name = OsgiUtil.toString(params.get(ConfigurationConstants.PROP_NAME), null);
+        this.priority = JobUtil.JobPriority.valueOf(OsgiUtil.toString(params.get(ConfigurationConstants.PROP_PRIORITY), ConfigurationConstants.DEFAULT_PRIORITY));
+        this.type = Type.valueOf(OsgiUtil.toString(params.get(ConfigurationConstants.PROP_TYPE), ConfigurationConstants.DEFAULT_TYPE));
+        this.runLocal = OsgiUtil.toBoolean(params.get(ConfigurationConstants.PROP_RUN_LOCAL), ConfigurationConstants.DEFAULT_RUN_LOCAL);
+        this.retries = OsgiUtil.toInteger(params.get(ConfigurationConstants.PROP_RETRIES), ConfigurationConstants.DEFAULT_RETRIES);
+        this.retryDelay = OsgiUtil.toLong(params.get(ConfigurationConstants.PROP_RETRY_DELAY), ConfigurationConstants.DEFAULT_RETRY_DELAY);
+        final int maxParallel = OsgiUtil.toInteger(params.get(ConfigurationConstants.PROP_MAX_PARALLEL), ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+        this.maxParallelProcesses = (maxParallel == -1 ? ConfigurationConstants.NUMBER_OF_PROCESSORS : maxParallel);
+        final String appIds[] = OsgiUtil.toStringArray(params.get(ConfigurationConstants.PROP_APP_IDS));
+        if ( appIds == null
+             || appIds.length == 0
+             || (appIds.length == 1 && (appIds[0] == null || appIds[0].length() == 0)) ) {
+            this.applicationIds = null;
+        } else {
+            this.applicationIds = appIds;
+        }
+        final String[] topics = OsgiUtil.toStringArray(params.get(ConfigurationConstants.PROP_TOPICS));
+        if ( topics == null
+             || topics.length == 0
+             || (topics.length == 1 && (topics[0] == null || topics[0].length() == 0))) {
+            matchers = null;
+            this.topics = null;
+        } else {
+            final Matcher[] newMatchers = new Matcher[topics.length];
+            for(int i=0; i < topics.length; i++) {
+                String value = topics[i];
+                if ( value != null ) {
+                    value = value.trim();
+                }
+                if ( value != null && value.length() > 0 ) {
+                    if ( value.endsWith(".") ) {
+                        newMatchers[i] = new PackageMatcher(value.substring(0, value.length() - 1));
+                    } else if ( value.endsWith("*") ) {
+                        newMatchers[i] = new SubPackageMatcher(value.substring(0, value.length() - 1));
+                    } else {
+                        newMatchers[i] = new ClassMatcher(value);
+                    }
+                }
+            }
+            matchers = newMatchers;
+            this.topics = topics;
+        }
+        this.serviceRanking = OsgiUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
+        this.pid = (String)params.get(Constants.SERVICE_PID);
+        this.valid = this.checkIsValid();
+    }
+
+    public InternalQueueConfiguration(final Event jobEvent) {
+        this.name = (String)jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
+        if ( jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_ORDERED) != null ) {
+            this.type = Type.ORDERED;
+            this.maxParallelProcesses = 1;
+        } else {
+            this.type = Type.UNORDERED;
+            int maxPar = ConfigurationConstants.DEFAULT_MAX_PARALLEL;
+            final Object value = jobEvent.getProperty(JobUtil.PROPERTY_JOB_PARALLEL);
+            if ( value != null ) {
+                if ( value instanceof Boolean ) {
+                    final boolean result = ((Boolean)value).booleanValue();
+                    if ( !result ) {
+                        maxPar = 1;
+                    }
+                } else if ( value instanceof Number ) {
+                    final int result = ((Number)value).intValue();
+                    if ( result > 1 ) {
+                        maxPar = result;
+                    } else {
+                        maxPar = 1;
+                    }
+                } else {
+                    final String strValue = value.toString();
+                    if ( "no".equalsIgnoreCase(strValue) || "false".equalsIgnoreCase(strValue) ) {
+                        maxPar = 1;
+                    } else {
+                        // check if this is a number
+                        try {
+                            final int result = Integer.valueOf(strValue).intValue();
+                            if ( result > 1 ) {
+                                maxPar = result;
+                            } else {
+                                maxPar = 1;
+                            }
+                        } catch (NumberFormatException ne) {
+                            // we ignore this
+                        }
+                    }
+                }
+            }
+            if ( maxPar == -1 ) {
+                maxPar = ConfigurationConstants.NUMBER_OF_PROCESSORS;
+            }
+            this.maxParallelProcesses = maxPar;
+        }
+        this.priority = JobUtil.JobPriority.valueOf(ConfigurationConstants.DEFAULT_PRIORITY);
+        this.runLocal = false;
+        this.retries = ConfigurationConstants.DEFAULT_RETRIES;
+        this.retryDelay = ConfigurationConstants.DEFAULT_RETRY_DELAY;
+        this.serviceRanking = 0;
+        this.applicationIds = null;
+        this.matchers = null;
+        this.topics = new String[] {"<Custom:" + jobEvent.getProperty(JobUtil.PROPERTY_JOB_TOPIC) + ">"};
+        this.valid = true;
+    }
+
+    /**
+     * Check if this configuration is valid,
+     * If it is invalid, it is ignored.
+     */
+    private boolean checkIsValid() {
+        boolean hasMatchers = false;
+        if ( this.matchers != null ) {
+            for(final Matcher m : this.matchers ) {
+                if ( m != null ) {
+                    hasMatchers = true;
+                }
+            }
+        }
+        if ( !hasMatchers ) {
+            return false;
+        }
+        if ( name == null || name.length() == 0 ) {
+            return false;
+        }
+        if ( retries < 0 ) {
+            return false;
+        }
+        if ( type == Type.UNORDERED || type == Type.TOPIC_ROUND_ROBIN ) {
+            if ( maxParallelProcesses < 1 ) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public boolean isValid() {
+        return this.valid;
+    }
+
+    /**
+     * Check if the queue processes the event.
+     * @param event The event
+     */
+    public boolean match(final JobEvent event) {
+        final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+        if ( this.matchers != null ) {
+            for(final Matcher m : this.matchers ) {
+                if ( m != null ) {
+                    final String rep = m.match(topic);
+                    if ( rep != null ) {
+                        final String name = this.name.replace("{0}", rep);
+                        event.queueName = name;
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Return the name of the queue.
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /**
+     * Checks if the event should be skipped.
+     * This can happen if
+     * - the queue is of type ignore
+     * - the queue is bound to some application id
+     * - the event is a local event generated with a different application id
+     */
+    public boolean isSkipped(final JobEvent event) {
+        if ( this.type == Type.IGNORE ) {
+            return true;
+        }
+        if ( this.applicationIds != null ) {
+            boolean found = false;
+            for(final String id : this.applicationIds) {
+                if ( Environment.APPLICATION_ID.equals(id) ) {
+                    found = true;
+                }
+            }
+            if ( !found ) {
+                return true;
+            }
+        }
+        if ( this.runLocal
+             && !event.event.getProperty(EventUtil.PROPERTY_APPLICATION).equals(Environment.APPLICATION_ID) ) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getRetryDelayInMs()
+     */
+    public long getRetryDelayInMs() {
+        return this.retryDelay;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getMaxRetries()
+     */
+    public int getMaxRetries() {
+        return this.retries;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getType()
+     */
+    public Type getType() {
+        return this.type;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getPriority()
+     */
+    public JobUtil.JobPriority getPriority() {
+        return this.priority;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getMaxParallel()
+     */
+    public int getMaxParallel() {
+        return this.maxParallelProcesses;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#isLocalQueue()
+     */
+    public boolean isLocalQueue() {
+        return this.runLocal;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getApplicationIds()
+     */
+    public String[] getApplicationIds() {
+        return this.applicationIds;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getTopics()
+     */
+    public String[] getTopics() {
+        return this.topics;
+    }
+
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#getRanking()
+     */
+    public int getRanking() {
+        return this.serviceRanking;
+    }
+
+    public String getPid() {
+        return this.pid;
+    }
+
+    @Override
+    public String toString() {
+        return "Queue-Configuration(" + this.hashCode() + ") : {" +
+            "name=" + this.name +
+            ", type=" + this.type +
+            ", topics=" + (this.matchers == null ? "[]" : Arrays.toString(this.matchers)) +
+            ", maxParallelProcesses=" + this.maxParallelProcesses +
+            ", retries=" + this.retries +
+            ", retryDelayInMs= " + this.retryDelay +
+            ", applicationIds= " + (this.applicationIds == null ? "[]" : Arrays.toString(this.applicationIds)) +
+            ", serviceRanking=" + this.serviceRanking +
+            ", pid=" + this.pid +
+            ", isValid=" + this.isValid() + "}";
+    }
+
+    private static interface Matcher {
+        String match(String topic);
+    }
+    private static final class PackageMatcher implements Matcher {
+        private final String packageName;
+
+        public PackageMatcher(final String name) {
+            this.packageName = name;
+        }
+        public String match(final String topic) {
+            final int pos = topic.lastIndexOf('/');
+            return pos > -1 && topic.substring(0, pos).equals(packageName) ? topic.substring(pos + 1) : null;
+        }
+    }
+    private static final class SubPackageMatcher implements Matcher {
+        private final String packageName;
+
+        public SubPackageMatcher(final String name) {
+            this.packageName = name + '/';
+        }
+        public String match(final String topic) {
+            final int pos = topic.lastIndexOf('/');
+            return pos > -1 && topic.substring(0, pos + 1).startsWith(this.packageName) ? topic.substring(this.packageName.length()) : null;
+        }
+    }
+    private static final class ClassMatcher implements Matcher {
+        private final String className;
+
+        public ClassMatcher(final String name) {
+            this.className = name;
+        }
+        public String match(String topic) {
+            return this.className.equals(topic) ? "" : null;
+        }
+    }
+
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.framework.BundleContext;
+import org.osgi.util.tracker.ServiceTracker;
+
+
+/**
+ * An event handler for special job events.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component
+@Services({
+    @Service(value=QueueConfigurationManager.class)
+})
+public class QueueConfigurationManager {
+
+    /** Configurations - ordered by service ranking. */
+    private volatile InternalQueueConfiguration[] orderedConfigs = new InternalQueueConfiguration[0];
+
+    /** Service tracker for the configurations. */
+    private ServiceTracker configTracker;
+
+    /** Tracker count to detect changes. */
+    private volatile int lastTrackerCount = -1;
+
+
+    /**
+     * Activate this component.
+     * Create the service tracker and start it.
+     */
+    @Activate
+    protected void activate(final BundleContext bundleContext) {
+        this.configTracker = new ServiceTracker(bundleContext,
+                InternalQueueConfiguration.class.getName(), null);
+        this.configTracker.open();
+    }
+
+    /**
+     * Deactivate this component.
+     * Stop the service tracker.
+     */
+    @Deactivate
+    protected void deactivate() {
+        if ( this.configTracker != null ) {
+            this.configTracker.close();
+            this.configTracker = null;
+        }
+    }
+
+    /**
+     * Return all configurations.
+     */
+    public InternalQueueConfiguration[] getConfigurations() {
+        final int count = this.configTracker.getTrackingCount();
+        InternalQueueConfiguration[] configurations = this.orderedConfigs;
+        if ( this.lastTrackerCount < count ) {
+            synchronized ( this ) {
+                configurations = this.orderedConfigs;
+                if ( this.lastTrackerCount < count ) {
+                    final Object[] trackedConfigs = this.configTracker.getServices();
+                    if ( trackedConfigs == null || trackedConfigs.length == 0 ) {
+                        configurations = new InternalQueueConfiguration[0];
+                    } else {
+                        configurations = new InternalQueueConfiguration[trackedConfigs.length];
+                        int i = 0;
+                        for(final Object entry : trackedConfigs) {
+                            final InternalQueueConfiguration config = (InternalQueueConfiguration)entry;
+                            configurations[i] = config;
+                            i++;
+                        }
+                    }
+                    this.orderedConfigs = configurations;
+                    this.lastTrackerCount = count;
+                }
+            }
+        }
+        return configurations;
+    }
+
+    /**
+     * Find the queue configuration for the job.
+     * This method only returns a configuration if one matches.
+     */
+    public InternalQueueConfiguration getQueueConfiguration(final JobEvent event) {
+        final InternalQueueConfiguration[] configurations = this.getConfigurations();
+        // check for queue name first
+        final String queueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
+        for(final InternalQueueConfiguration config : configurations) {
+            if ( config.isValid() ) {
+                if ( queueName != null ) {
+                    if ( queueName.equals(config.getName()) ) {
+                        event.queueName = queueName;
+                        return config;
+                    }
+                } else {
+                    if ( config.match(event) ) {
+                        return config;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.console;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Date;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.impl.jobs.DefaultJobManager;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.Statistics;
+
+/**
+ * This is a webconsole plugin displaying the active queues, some statistics
+ * and the configurations.
+ * @since 3.0
+ */
+@Component
+@Service(value=javax.servlet.Servlet.class)
+@Properties({
+    @Property(name="felix.webconsole.label", value="slingevent", propertyPrivate=true),
+    @Property(name="felix.webconsole.title", value="Sling Eventing", propertyPrivate=true)
+})
+public class WebConsolePlugin extends HttpServlet {
+
+    private static final long serialVersionUID = -6983227434841706385L;
+
+    @Reference
+    private JobManager jobManager;
+
+    @Reference
+    private QueueConfigurationManager queueConfigManager;
+
+    /** Escape the output for html. */
+    private String escape(final String text) {
+        if ( text == null ) {
+            return "";
+        }
+        return text.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;");
+    }
+
+    private Queue getQueue(final HttpServletRequest req) throws ServletException {
+        final String name = req.getParameter("queue");
+        if ( name != null ) {
+            for(final Queue q : this.jobManager.getQueues()) {
+                if ( name.equals(q.getName()) ) {
+                    return q;
+                }
+            }
+        }
+        throw new ServletException("Wrong parameters");
+    }
+
+    @Override
+    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
+    throws ServletException, IOException {
+        final String cmd = req.getParameter("action");
+        if ( "suspend".equals(cmd) ) {
+            final Queue q = this.getQueue(req);
+            q.suspend();
+        } else if ( "resume".equals(cmd) ) {
+            final Queue q = this.getQueue(req);
+            q.resume();
+        } else if ( "clear".equals(cmd) ) {
+            final Queue q = this.getQueue(req);
+            q.clear();
+        } else if ( "dropall".equals(cmd) ) {
+            final Queue q = this.getQueue(req);
+            q.removeAll();
+        } else {
+            throw new ServletException("Unknown command");
+        }
+        resp.sendRedirect(req.getContextPath() + req.getServletPath() + req.getPathInfo());
+    }
+
+    @Override
+    protected void doGet(final HttpServletRequest req, final HttpServletResponse res)
+     throws ServletException, IOException {
+        final PrintWriter pw = res.getWriter();
+
+        pw.println("<p class='statline ui-state-highlight'>Apache Sling Eventing</p>");
+
+        pw.println("<table class='nicetable'><tbody>");
+        Statistics s = this.jobManager.getStatistics();
+        pw.println("<tr><th colspan='2'>Overall Statistics</th></tr>");
+        pw.printf("<tr><td>Start Time</td><td>%s</td></tr>", formatDate(s.getStartTime()));
+        pw.printf("<tr><td>Last Activated</td><td>%s</td></tr>", formatDate(s.getLastActivatedJobTime()));
+        pw.printf("<tr><td>Last Finished</td><td>%s</td></tr>", formatDate(s.getLastFinishedJobTime()));
+        pw.printf("<tr><td>Queued Jobs</td><td>%s</td></tr>", s.getNumberOfQueuedJobs());
+        pw.printf("<tr><td>Active Jobs</td><td>%s</td></tr>", s.getNumberOfActiveJobs());
+        pw.printf("<tr><td>Jobs</td><td>%s</td></tr>", s.getNumberOfJobs());
+        pw.printf("<tr><td>Finished Jobs</td><td>%s</td></tr>", s.getNumberOfFinishedJobs());
+        pw.printf("<tr><td>Failed Jobs</td><td>%s</td></tr>", s.getNumberOfFailedJobs());
+        pw.printf("<tr><td>Cancelled Jobs</td><td>%s</td></tr>", s.getNumberOfCancelledJobs());
+        pw.printf("<tr><td>Processed Jobs</td><td>%s</td></tr>", s.getNumberOfProcessedJobs());
+        pw.printf("<tr><td>Average Processing Time</td><td>%s</td></tr>", formatTime(s.getAverageProcessingTime()));
+        pw.printf("<tr><td>Average Waiting Time</td><td>%s</td></tr>", formatTime(s.getAverageWaitingTime()));
+        pw.println("</tbody></table>");
+        pw.println("<br/>");
+
+        boolean isEmpty = true;
+        for(final Queue q : this.jobManager.getQueues()) {
+            isEmpty = false;
+            pw.println("<div class='ui-widget-header ui-corner-top buttonGroup'>");
+            pw.printf("<span style='float: left; margin-left: 1em'>Active JobQueue: %s %s</span>", escape(q.getName()),
+                    q.isSuspended() ? "(SUSPENDED)" : "");
+            if ( q.isSuspended() ) {
+                this.printForm(pw, q, "Resume", "resume");
+            } else {
+                this.printForm(pw, q, "Suspend", "suspend");
+            }
+            this.printForm(pw, q, "Clear", "clear");
+            this.printForm(pw, q, "Drop All", "dropall");
+            pw.println("</div>");
+            pw.println("<table class='nicetable'><tbody>");
+
+            s = q.getStatistics();
+            final QueueConfiguration c = q.getConfiguration();
+            pw.println("<tr><th colspan='2'>Statistics</th><th colspan='2'>Configuration</th></tr>");
+            pw.printf("<tr><td>Start Time</td><td>%s</td><td>Type</td><td>%s</td></tr>", formatDate(s.getStartTime()), c.getType());
+            pw.printf("<tr><td>Last Activated</td><td>%s</td><td>Topics</td><td>%s</td></tr>", formatDate(s.getLastActivatedJobTime()), formatArray(c.getTopics()));
+            pw.printf("<tr><td>Last Finished</td><td>%s</td><td>Max Parallel</td><td>%s</td></tr>", formatDate(s.getLastFinishedJobTime()), c.getMaxParallel());
+            pw.printf("<tr><td>Queued Jobs</td><td>%s</td><td>Max Retries</td><td>%s</td></tr>", s.getNumberOfQueuedJobs(), c.getMaxRetries());
+            pw.printf("<tr><td>Active Jobs</td><td>%s</td><td>Retry Delay</td><td>%s ms</td></tr>", s.getNumberOfActiveJobs(), c.getRetryDelayInMs());
+            pw.printf("<tr><td>Jobs</td><td>%s</td><td>Priority</td><td>%s</td></tr>", s.getNumberOfJobs(), c.getPriority());
+            pw.printf("<tr><td>Finished Jobs</td><td>%s</td><td>Run Local</td><td>%s</td></tr>", s.getNumberOfFinishedJobs(), c.isLocalQueue());
+            pw.printf("<tr><td>Failed Jobs</td><td>%s</td><td>App Ids</td><td>%s</td></tr>", s.getNumberOfFailedJobs(), formatArray(c.getApplicationIds()));
+            pw.printf("<tr><td>Cancelled Jobs</td><td>%s</td><td colspan='2'>&nbsp</td></tr>", s.getNumberOfCancelledJobs());
+            pw.printf("<tr><td>Processed Jobs</td><td>%s</td><td colspan='2'>&nbsp</td></tr>", s.getNumberOfProcessedJobs());
+            pw.printf("<tr><td>Average Processing Time</td><td>%s</td><td colspan='2'>&nbsp</td></tr>", formatTime(s.getAverageProcessingTime()));
+            pw.printf("<tr><td>Average Waiting Time</td><td>%s</td><td colspan='2'>&nbsp</td></tr>", formatTime(s.getAverageWaitingTime()));
+            pw.printf("<tr><td>Status Info</td><td>%s</td></tr>", escape(q.getStatusInfo()));
+            pw.println("</tbody></table>");
+            pw.println("<br/>");
+        }
+        if ( isEmpty ) {
+            pw.println("<p>No active queues.</p>");
+            pw.println("<br/>");
+        }
+
+        pw.println("<p class='statline'>Apache Sling Eventing - Job Queue Configurations</p>");
+        this.printQueueConfiguration(req, pw, ((DefaultJobManager)this.jobManager).getMainQueueConfiguration());
+        final InternalQueueConfiguration[] configs = this.queueConfigManager.getConfigurations();
+        for(final InternalQueueConfiguration c : configs ) {
+            this.printQueueConfiguration(req, pw, c);
+        }
+    }
+
+    private void printQueueConfiguration(final HttpServletRequest req, final PrintWriter pw, final InternalQueueConfiguration c) {
+        pw.println("<div class='ui-widget-header ui-corner-top buttonGroup'>");
+        pw.printf("<span style='float: left; margin-left: 1em'>Job Queue Configuration: %s</span>%n",
+                escape(c.getName()));
+        pw.printf("<button id='edit' class='ui-state-default ui-corner-all' onclick='javascript:window.location=\"%s%s/configMgr/%s\";'>Edit</button>",
+                req.getContextPath(), req.getServletPath(), c.getPid());
+        pw.println("</div>");
+        pw.println("<table class='nicetable'><tbody>");
+        pw.println("<tr><th colspan='2'>Configuration</th></tr>");
+        pw.printf("<tr><td>Valid</td><td>%s</td></tr>", c.isValid());
+        pw.printf("<tr><td>Type</td><td>%s</td></tr>", c.getType());
+        pw.printf("<tr><td>Topics</td><td>%s</td></tr>", formatArray(c.getTopics()));
+        pw.printf("<tr><td>Max Parallel</td><td>%s</td></tr>", c.getMaxParallel());
+        pw.printf("<tr><td>Max Retries</td><td>%s</td></tr>", c.getMaxRetries());
+        pw.printf("<tr><td>Retry Delay</td><td>%s ms</td></tr>", c.getRetryDelayInMs());
+        pw.printf("<tr><td>Priority</td><td>%s ms</td></tr>", c.getPriority());
+        pw.printf("<tr><td>Run Local</td><td>%s ms</td></tr>", c.isLocalQueue());
+        pw.printf("<tr><td>App Ids</td><td>%s ms</td></tr>", formatArray(c.getApplicationIds()));
+        pw.printf("<tr><td>Ranking</td><td>%s ms</td></tr>", c.getRanking());
+
+        pw.println("</tbody></table>");
+        pw.println("<br/>");
+    }
+
+    /** TODO */
+    private String formatArray(final String[] array) {
+        if ( array == null || array.length == 0 ) {
+            return "";
+        }
+        return escape(Arrays.toString(array));
+    }
+
+    /** TODO */
+    private String formatDate(final long time) {
+        if ( time == -1 ) {
+            return "-";
+        }
+        final Date d = new Date(time);
+        return d.toString();
+    }
+
+    /** TODO */
+    private String formatTime(final long time) {
+        if ( time == 0 ) {
+            return "-";
+        }
+        if ( time < 1000 ) {
+            return time + " ms";
+        } else if ( time < 1000 * 60 ) {
+            return time / 1000 + " secs";
+        }
+        final long min = time / 1000 / 60;
+        final long secs = (time - min * 1000 * 60);
+        return min + " min " + secs / 1000 + " secs";
+    }
+
+    private void printForm(final PrintWriter pw,
+            final Queue q,
+            final String buttonLabel,
+            final String hiddenValue) {
+        pw.printf("<form method='POST' name='%s'><input type='hidden' name='action' value='%s'/>"+
+                "<input type='hidden' name='queue' value='%s'/>" +
+                "<button class='ui-state-default ui-corner-all' onclick='javascript:document.forms[\"%s\"].submit();'>" +
+                "%s</button></form>", hiddenValue, hiddenValue, q.getName(), hiddenValue, buttonLabel);
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.PropertyIterator;
+import javax.jcr.PropertyType;
+import javax.jcr.RepositoryException;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+
+
+/**
+ * Helper class defining some constants and utility methods.
+ */
+public abstract class JCRHelper {
+
+    /** The namespace prefix. */
+    public static final String EVENT_PREFIX = "slingevent:";
+
+    public static final String NODE_PROPERTY_TOPIC = "slingevent:topic";
+    public static final String NODE_PROPERTY_APPLICATION = "slingevent:application";
+    public static final String NODE_PROPERTY_CREATED = "slingevent:created";
+    public static final String NODE_PROPERTY_PROPERTIES = "slingevent:properties";
+    public static final String NODE_PROPERTY_PROCESSOR = "slingevent:processor";
+    public static final String NODE_PROPERTY_JOBID = "slingevent:id";
+    public static final String NODE_PROPERTY_FINISHED = "slingevent:finished";
+    public static final String NODE_PROPERTY_TE_EXPRESSION = "slingevent:expression";
+    public static final String NODE_PROPERTY_TE_DATE = "slingevent:date";
+    public static final String NODE_PROPERTY_TE_PERIOD = "slingevent:period";
+
+    public static final String EVENT_NODE_TYPE = "slingevent:Event";
+    public static final String JOB_NODE_TYPE = "slingevent:Job";
+    public static final String TIMED_EVENT_NODE_TYPE = "slingevent:TimedEvent";
+
+    /** The nodetype for newly created intermediate folders */
+    public static final String NODETYPE_FOLDER = "sling:Folder";
+
+    /** The nodetype for newly created folders */
+    public static final String NODETYPE_ORDERED_FOLDER = "sling:OrderedFolder";
+
+
+    /** List of ignored properties to write to the repository. */
+    private static final String[] IGNORE_PROPERTIES = new String[] {
+        EventUtil.PROPERTY_DISTRIBUTE,
+        EventUtil.PROPERTY_APPLICATION,
+        JobUtil.JOB_ID,
+        JobStatusNotifier.CONTEXT_PROPERTY_NAME
+    };
+
+    /** List of ignored prefixes to read from the repository. */
+    private static final String[] IGNORE_PREFIXES = new String[] {
+        JCRHelper.EVENT_PREFIX
+    };
+
+    /**
+     * Check if this property should be ignored
+     */
+    private static boolean ignoreProperty(final String name) {
+        for(final String prop : IGNORE_PROPERTIES) {
+            if ( prop.equals(name) ) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Add all java properties as properties to the node.
+     * If the name and the value of a map entry can easily converted into
+     * a repository property, it is directly added. All other java
+     * properties are stored in one binary property.
+     *
+     * @param node The node where all properties are added to
+     * @param event The event.
+     * @throws RepositoryException
+     */
+    public static void writeEventProperties(final Node node,
+                                            final Event event)
+    throws RepositoryException {
+        if ( event != null ) {
+            final String[] propNames = event.getPropertyNames();
+            if ( propNames != null && propNames.length > 0 ) {
+                // check which props we can write directly and
+                // which we need to write as a binary blob
+                final List<String> propsAsBlob = new ArrayList<String>();
+
+                for(final String name : propNames) {
+
+                    if ( !ignoreProperty(name) ) {
+                        // sanity check
+                        final Object value = event.getProperty(name);
+                        if ( value != null ) {
+                            if ( !setProperty(name, value, node) ) {
+                                propsAsBlob.add(name);
+                            }
+                        }
+                    }
+                }
+                // write the remaining properties as a blob
+                if ( propsAsBlob.size() > 0 ) {
+                    try {
+                        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        final ObjectOutputStream oos = new ObjectOutputStream(baos);
+                        oos.writeInt(propsAsBlob.size());
+                        for(final String propName : propsAsBlob) {
+                            oos.writeObject(propName);
+                            try {
+                                oos.writeObject(event.getProperty(propName));
+                            } catch (IOException ioe) {
+                                throw new RepositoryException("Unable to serialize property " + propName, ioe);
+                            }
+                        }
+                        oos.close();
+                        node.setProperty(JCRHelper.NODE_PROPERTY_PROPERTIES,
+                                node.getSession().getValueFactory().createBinary(new ByteArrayInputStream(baos.toByteArray())));
+                    } catch (IOException ioe) {
+                        throw new RepositoryException("Unable to serialize event " + EventUtil.toString(event), ioe);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Read event properties from a repository node and create a property map (dictionary).
+     * As the properties might contain serialized java objects, a class loader can be specified
+     * for loading classes of the serialized java objects.
+     * @throws RepositoryException
+     * @throws ClassNotFoundException
+     */
+    public static Dictionary<String, Object> readEventProperties(final Node node,
+                                                                 final ClassLoader objectClassLoader,
+                                                                 final boolean forceLoad)
+    throws RepositoryException, ClassNotFoundException {
+        final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+
+        // check the properties blob
+        if ( node.hasProperty(JCRHelper.NODE_PROPERTY_PROPERTIES) ) {
+            try {
+                final ObjectInputStream ois = new ObjectInputStream(node.getProperty(JCRHelper.NODE_PROPERTY_PROPERTIES).getBinary().getStream(),
+                        objectClassLoader);
+                int length = ois.readInt();
+                for(int i=0;i<length;i++) {
+                    final String key = (String)ois.readObject();
+                    final Object value = ois.readObject();
+                    properties.put(key, value);
+                }
+            } catch (ClassNotFoundException cnfe) {
+                if ( !forceLoad ) {
+                    throw cnfe;
+                }
+            } catch (java.io.InvalidClassException ice) {
+                if ( !forceLoad ) {
+                    throw new ClassNotFoundException("Found invalid class.", ice);
+                }
+            } catch (IOException ioe) {
+                if ( !forceLoad ) {
+                    throw new RepositoryException("Unable to deserialize event properties.", ioe);
+                }
+            }
+        }
+        // now all properties that have been set directly
+        final PropertyIterator pI = node.getProperties();
+        while ( pI.hasNext() ) {
+            final Property p = pI.nextProperty();
+            boolean ignore = p.getName().startsWith("jcr:");
+            if ( !ignore) {
+                int index = 0;
+                while ( !ignore && index < IGNORE_PREFIXES.length ) {
+                    ignore = p.getName().startsWith(IGNORE_PREFIXES[index]);
+                    index++;
+                }
+            }
+            if ( !ignore ) {
+                final String name = ISO9075.decode(p.getName());
+                if ( p.getDefinition().isMultiple() ) {
+                    final Value[] values = p.getValues();
+                    if ( values.length > 0 ) {
+                        // get first value
+                        final Object firstObject = getPropertyValue(values[0]);
+                        final Object[] array;
+                        if ( firstObject instanceof Boolean ) {
+                            array = new Boolean[values.length];
+                        } else if ( firstObject instanceof Calendar ) {
+                            array = new Calendar[values.length];
+                        } else if ( firstObject instanceof Double ) {
+                            array = new Double[values.length];
+                        } else if ( firstObject instanceof Long ) {
+                            array = new Long[values.length];
+                        } else if ( firstObject instanceof BigDecimal) {
+                            array = new BigDecimal[values.length];
+                        } else {
+                            array = new String[values.length];
+                        }
+                        array[0] = firstObject;
+                        int index = 1;
+                        while ( index < values.length ) {
+                            array[index] = getPropertyValue(values[index]);
+                            index++;
+                        }
+                        properties.put(name, array);
+                    }
+                } else {
+                    final Value value = p.getValue();
+                    final Object o = getPropertyValue(value);
+                    properties.put(name, o);
+                }
+            }
+        }
+        return properties;
+    }
+
+    /**
+     * Return the converted repository property name
+     * @param name The java object property name
+     * @return The converted name or null if not possible.
+     */
+    public static String getNodePropertyName(final String name) {
+        // if name contains a colon, we can't set it as a property
+        if ( name.indexOf(':') != -1 ) {
+            return null;
+        }
+        return ISO9075.encode(name);
+    }
+
+    /**
+     * Return the converted repository property value
+     * @param valueFactory The value factory
+     * @param eventValue The event value
+     * @return The converted value or null if not possible
+     */
+    public static Value getNodePropertyValue(final ValueFactory valueFactory, final Object eventValue) {
+        final Value val;
+        if (eventValue instanceof Calendar) {
+            val = valueFactory.createValue((Calendar)eventValue);
+        } else if (eventValue instanceof Long) {
+            val = valueFactory.createValue((Long)eventValue);
+        } else if (eventValue instanceof Double) {
+            val = valueFactory.createValue(((Double)eventValue).doubleValue());
+        } else if (eventValue instanceof Boolean) {
+            val = valueFactory.createValue((Boolean) eventValue);
+        } else if (eventValue instanceof BigDecimal) {
+            val = valueFactory.createValue((BigDecimal) eventValue);
+        } else if (eventValue instanceof String) {
+            val = valueFactory.createValue((String)eventValue);
+        } else {
+            val = null;
+        }
+        return val;
+    }
+
+    /**
+     * Convert the value back to an object.
+     * @param value
+     * @return
+     * @throws RepositoryException
+     */
+    private static Object getPropertyValue(final Value value)
+    throws RepositoryException {
+        final Object o;
+        switch (value.getType()) {
+            case PropertyType.BOOLEAN:
+                o = value.getBoolean(); break;
+            case PropertyType.DATE:
+                o = value.getDate(); break;
+            case PropertyType.DOUBLE:
+                o = value.getDouble(); break;
+            case PropertyType.LONG:
+                o = value.getLong(); break;
+            case PropertyType.STRING:
+                o = value.getString(); break;
+            case PropertyType.DECIMAL:
+                o = value.getDecimal(); break;
+            default: // this should never happen - we convert to a string...
+                o = value.getString();
+        }
+        return o;
+    }
+
+    /**
+     * Try to set the java property as a property of the node.
+     * @param name
+     * @param value
+     * @param node
+     * @return
+     * @throws RepositoryException
+     */
+    private static boolean setProperty(String name, Object value, Node node)
+    throws RepositoryException {
+        final String propName = getNodePropertyName(name);
+        if ( propName == null ) {
+            return false;
+        }
+        final ValueFactory fac = node.getSession().getValueFactory();
+        // check for multi value
+        if ( value.getClass().isArray() ) {
+            final Object[] array = (Object[])value;
+            // now we try to convert each value
+            // and check if all converted values have the same type
+            final Value[] values = new Value[array.length];
+            int index = 0;
+            for(final Object v : array ) {
+                values[index] = getNodePropertyValue(fac, v);
+                if ( values[index] == null ) {
+                    return false;
+                }
+                if ( index > 0 && !values[index-1].getClass().equals(values[index].getClass()) ) {
+                    return false;
+                }
+                index++;
+            }
+            node.setProperty(propName, values);
+            return true;
+        }
+        final Value val = getNodePropertyValue(fac, value);
+        if ( val != null ) {
+            node.setProperty(propName, val);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * This is an extended version of the object input stream which uses the
+     * thread context class loader.
+     */
+    private static class ObjectInputStream extends java.io.ObjectInputStream {
+
+        private ClassLoader classloader;
+
+        public ObjectInputStream(final InputStream in, final ClassLoader classLoader) throws IOException {
+            super(in);
+            this.classloader = classLoader;
+        }
+
+        /**
+         * @see java.io.ObjectInputStream#resolveClass(java.io.ObjectStreamClass)
+         */
+        @Override
+        protected Class<?> resolveClass(java.io.ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+            if ( this.classloader != null ) {
+                return Class.forName(classDesc.getName(), true, this.classloader);
+            }
+            return super.resolveClass(classDesc);
+        }
+    }
+}

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+
+/**
+ * This object encapsulates all information about a job.
+ */
+public class JCRJobEvent extends JobEvent {
+
+    private final PersistenceHandler handler;
+
+    public JCRJobEvent(final Event e, final PersistenceHandler handler) {
+        super(e, (String)e.getProperty(JobUtil.JOB_ID));
+        this.handler = handler;
+    }
+
+    @Override
+    public boolean lock() {
+        return this.handler.lock(this);
+    }
+
+    @Override
+    public void unlock() {
+        this.handler.unlock(this);
+    }
+
+    @Override
+    public void finished() {
+        this.handler.finished(this);
+    }
+
+    @Override
+    public boolean reschedule() {
+        return this.handler.reschedule(this);
+    }
+
+    @Override
+    public boolean remove() {
+        return this.handler.remove(this.uniqueId);
+    }
+}
\ No newline at end of file

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain