You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2010/10/11 08:54:14 UTC
svn commit: r1021247 [3/6] - in /sling/branches/eventing-3.0: ./ .settings/
src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/sling/ src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/...
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.osgi.service.event.Event;
+
+public interface JobStatusNotifier {
+
+ String CONTEXT_PROPERTY_NAME = JobStatusNotifier.class.getName();
+
+ class NotifierContext {
+ public final JobStatusNotifier notifier;
+
+ public NotifierContext(JobStatusNotifier n) {
+ this.notifier = n;
+ }
+ }
+
+ /**
+ * Send an acknowledge message that someone is processing the job.
+ * @param job The job.
+ * @return <code>true</code> if the ack is ok, <code>false</code> otherwise (e.g. if
+ * someone else already send an ack for this job.
+ */
+ boolean sendAcknowledge(Event job);
+
+ /**
+ * Notify that the job is finished.
+ * If the job is not rescheduled, a return value of <code>false</code> indicates an error
+ * during the processing. If the job should be rescheduled, <code>true</code> indicates
+ * that the job could be rescheduled. If an error occurs or the number of retries is
+ * exceeded, <code>false</code> will be returned.
+ * @param job The job.
+ * @param reschedule Should the event be rescheduled?
+ * @return <code>true</code> if everything went fine, <code>false</code> otherwise.
+ */
+ boolean finishedJob(Event job, boolean reschedule);
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.sling.event.jobs.JobsIterator;
+import org.osgi.service.event.Event;
+
+/**
+ * Jobs iterator.
+ */
+public class JobsIteratorImpl implements JobsIterator {
+
+ /** The events list size. */
+ private final long size;
+
+ /** The current position. */
+ private int index = 0;
+
+ /** The iterator. */
+ private final Iterator<Event> iter;
+
+ public JobsIteratorImpl(final List<Event> events) {
+ this.size = events.size();
+ this.iter = events.iterator();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobsIterator#getPosition()
+ */
+ public long getPosition() {
+ return this.index;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobsIterator#getSize()
+ */
+ public long getSize() {
+ return this.size;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.JobsIterator#skip(long)
+ */
+ public void skip(long skipNum) {
+ if ( skipNum < 0 ) {
+ throw new IllegalArgumentException();
+ }
+ if ( this.index + skipNum >= this.size ) {
+ throw new NoSuchElementException();
+ }
+ for(long i=0; i<skipNum; i++) {
+ this.iter.next();
+ }
+ this.index += skipNum;
+ }
+
+ /**
+ * @see java.util.Iterator#hasNext()
+ */
+ public boolean hasNext() {
+ return this.iter.hasNext();
+ }
+
+ /**
+ * @see java.util.Iterator#next()
+ */
+ public Event next() {
+ return this.iter.next();
+ }
+
+ /**
+ * @see java.util.Iterator#remove()
+ */
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @see java.lang.Iterable#iterator()
+ */
+ public Iterator<Event> iterator() {
+ return this;
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import org.apache.sling.event.jobs.Statistics;
+
+/**
+ * Implementation of the statistics.
+ */
+public class StatisticsImpl implements Statistics {
+
+ private final long startTime;
+
+ private volatile long activeJobs;
+
+ private volatile long queuedJobs;
+
+ private volatile long lastActivated = -1;
+
+ private volatile long lastFinished = -1;
+
+ private volatile long averageWaitingTime;
+
+ private volatile long averageProcessingTime;
+
+ private volatile long waitingTime;
+
+ private volatile long processingTime;
+
+ private volatile long waitingCount;
+
+ private volatile long processingCount;
+
+ private volatile long finishedJobs;
+
+ private volatile long failedJobs;
+
+ private volatile long cancelledJobs;
+
+ public StatisticsImpl() {
+ this.startTime = System.currentTimeMillis();
+ }
+
+ public StatisticsImpl(final long startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getStartTime()
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getNumberOfProcessedJobs()
+ */
+ public synchronized long getNumberOfProcessedJobs() {
+ return getNumberOfCancelledJobs() + getNumberOfFailedJobs() + getNumberOfFinishedJobs();
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getNumberOfActiveJobs()
+ */
+ public synchronized long getNumberOfActiveJobs() {
+ return activeJobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getNumberOfQueuedJobs()
+ */
+ public synchronized long getNumberOfQueuedJobs() {
+ return queuedJobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getNumberOfJobs()
+ */
+ public synchronized long getNumberOfJobs() {
+ return activeJobs + queuedJobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getAverageWaitingTime()
+ */
+ public synchronized long getAverageWaitingTime() {
+ return averageWaitingTime;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getAverageProcessingTime()
+ */
+ public synchronized long getAverageProcessingTime() {
+ return averageProcessingTime;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getNumberOfFinishedJobs()
+ */
+ public synchronized long getNumberOfFinishedJobs() {
+ return finishedJobs;
+ }
+
+ public synchronized long getNumberOfCancelledJobs() {
+ return cancelledJobs;
+ }
+
+ public synchronized long getNumberOfFailedJobs() {
+ return failedJobs;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getLastActivatedJobTime()
+ */
+ public synchronized long getLastActivatedJobTime() {
+ return this.lastActivated;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.Statistics#getLastFinishedJobTime()
+ */
+ public synchronized long getLastFinishedJobTime() {
+ return this.lastFinished;
+ }
+
+ /**
+ * Add a finished job
+ */
+ public synchronized void finishedJob(final long time) {
+ this.lastFinished = System.currentTimeMillis();
+ this.processingTime += time;
+ this.processingCount++;
+ this.averageProcessingTime = this.processingTime / this.processingCount;
+ this.finishedJobs++;
+ this.activeJobs--;
+ }
+
+ public synchronized void failedJob() {
+ this.failedJobs++;
+ this.activeJobs--;
+ this.queuedJobs++;
+ }
+
+ public synchronized void cancelledJob() {
+ this.cancelledJobs++;
+ this.activeJobs--;
+ }
+
+ /**
+ * New job in the qeue
+ */
+ public synchronized void incQueued() {
+ this.queuedJobs++;
+ }
+
+ /**
+ * Job not processed by us
+ */
+ public synchronized void decQueued() {
+ this.queuedJobs--;
+ }
+
+ /**
+ * Clear
+ */
+ public synchronized void clearQueued() {
+ this.queuedJobs = 0;
+ }
+
+ /**
+ * Add a job from the queue to status active
+ * @param time The time the job stayed in the queue.
+ */
+ public synchronized void addActive(final long time) {
+ this.queuedJobs--;
+ this.activeJobs++;
+ this.waitingCount++;
+ this.waitingTime += time;
+ this.averageWaitingTime = this.waitingTime / this.waitingCount;
+ this.lastActivated = System.currentTimeMillis();
+ }
+
+ public synchronized void add(final StatisticsImpl other) {
+ synchronized ( other ) {
+ this.queuedJobs += other.queuedJobs;
+
+ if ( other.lastActivated > this.lastActivated ) {
+ this.lastActivated = other.lastActivated;
+ }
+ if ( other.lastFinished > this.lastFinished ) {
+ this.lastFinished = other.lastFinished;
+ }
+ this.waitingTime += other.waitingTime;
+ this.waitingCount += other.waitingCount;
+ this.averageWaitingTime = this.waitingTime / this.waitingCount;
+ this.processingTime += other.processingTime;
+ this.processingCount += other.processingCount;
+ this.averageProcessingTime = this.processingTime / this.processingCount;
+ this.finishedJobs += other.finishedJobs;
+ this.failedJobs += other.failedJobs;
+ this.cancelledJobs += other.cancelledJobs;
+ }
+ }
+
+ public synchronized StatisticsImpl copy() {
+ final StatisticsImpl other = new StatisticsImpl();
+ other.queuedJobs = this.queuedJobs;
+ other.lastActivated = this.lastActivated;
+ other.lastFinished = this.lastFinished;
+ other.averageWaitingTime = this.averageWaitingTime;
+ other.averageProcessingTime = this.averageProcessingTime;
+ other.waitingTime = this.waitingTime;
+ other.processingTime = this.processingTime;
+ other.waitingCount = this.waitingCount;
+ other.processingCount = this.processingCount;
+ other.finishedJobs = this.finishedJobs;
+ other.failedJobs = this.failedJobs;
+ other.cancelledJobs = this.cancelledJobs;
+ return other;
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.UUID;
+
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+
+public class Utility {
+
+ /** Allowed characters for a node name */
+ private static final String ALLOWED_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+*#!ยค$%&()=[]?";
+ /** Replacement characters for unallowed characters in a node name */
+ private static final char REPLACEMENT_CHAR = '_';
+
+ /**
+ * Filter the node name for not allowed characters and replace them.
+ * @param nodeName The suggested node name.
+ * @return The filtered node name.
+ */
+ public static String filter(final String nodeName) {
+ final StringBuilder sb = new StringBuilder();
+ char lastAdded = 0;
+
+ for(int i=0; i < nodeName.length(); i++) {
+ final char c = nodeName.charAt(i);
+ char toAdd = c;
+
+ if (ALLOWED_CHARS.indexOf(c) < 0) {
+ if (lastAdded == REPLACEMENT_CHAR) {
+ // do not add several _ in a row
+ continue;
+ }
+ toAdd = REPLACEMENT_CHAR;
+
+ } else if(i == 0 && Character.isDigit(c)) {
+ sb.append(REPLACEMENT_CHAR);
+ }
+
+ sb.append(toAdd);
+ lastAdded = toAdd;
+ }
+
+ if (sb.length()==0) {
+ sb.append(REPLACEMENT_CHAR);
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * used for the md5
+ */
+ public static final char[] hexTable = "0123456789abcdef".toCharArray();
+
+ /**
+ * Calculate an MD5 hash of the string given using 'utf-8' encoding.
+ *
+ * @param data the data to encode
+ * @return a hex encoded string of the md5 digested input
+ */
+ public static String md5(String data) {
+ try {
+ return digest("MD5", data.getBytes("utf-8"));
+ } catch (NoSuchAlgorithmException e) {
+ throw new InternalError("MD5 digest not available???");
+ } catch (UnsupportedEncodingException e) {
+ throw new InternalError("UTF8 digest not available???");
+ }
+ }
+
+ /**
+ * Digest the plain string using the given algorithm.
+ *
+ * @param algorithm The alogrithm for the digest. This algorithm must be
+ * supported by the MessageDigest class.
+ * @param data the data to digest with the given algorithm
+ * @return The digested plain text String represented as Hex digits.
+ * @throws java.security.NoSuchAlgorithmException if the desired algorithm is not supported by
+ * the MessageDigest class.
+ */
+ private static String digest(String algorithm, byte[] data)
+ throws NoSuchAlgorithmException {
+ MessageDigest md = MessageDigest.getInstance(algorithm);
+ byte[] digest = md.digest(data);
+ StringBuilder res = new StringBuilder(digest.length * 2);
+ for (int i = 0; i < digest.length; i++) {
+ byte b = digest[i];
+ res.append(hexTable[(b >> 4) & 15]);
+ res.append(hexTable[b & 15]);
+ }
+ return res.toString();
+ }
+
+
+ /**
+ * Create a unique node path (folder and name) for the job.
+ */
+ public static String getUniquePath(final String jobTopic, final String jobId) {
+ final StringBuilder sb = new StringBuilder(jobTopic.replace('/', '.'));
+ sb.append('/');
+ if ( jobId != null ) {
+ // we create an md from the job id - we use the first 6 bytes to
+ // create sub directories
+ final String md5 = md5(jobId);
+ sb.append(md5.substring(0, 2));
+ sb.append('/');
+ sb.append(md5.substring(2, 4));
+ sb.append('/');
+ sb.append(md5.substring(4, 6));
+ sb.append('/');
+ sb.append(filter(jobId));
+ } else {
+ // create a path from the uuid - we use the first 6 bytes to
+ // create sub directories
+ final String uuid = UUID.randomUUID().toString();
+ sb.append(uuid.substring(0, 2));
+ sb.append('/');
+ sb.append(uuid.substring(2, 4));
+ sb.append('/');
+ sb.append(uuid.substring(5, 7));
+ sb.append("/Job_");
+ sb.append(uuid.substring(8, 17));
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Helper method for sending the notification events.
+ */
+ public static void sendNotification(final EnvironmentComponent environment,
+ final String topic,
+ final Event job) {
+ final EventAdmin localEA = environment.getEventAdmin();
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(JobUtil.PROPERTY_NOTIFICATION_JOB, job);
+ props.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
+ localEA.postEvent(new Event(topic, props));
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+/**
+ * Constants for the queue configuration.
+ */
+public abstract class ConfigurationConstants {
+
+ public static final int NUMBER_OF_PROCESSORS = Runtime.getRuntime().availableProcessors();
+
+ public static final String DEFAULT_TYPE = "UNORDERED";
+ public static final String DEFAULT_PRIORITY = "NORM";
+ public static final boolean DEFAULT_RUN_LOCAL = false;
+ public static final int DEFAULT_RETRIES = 10;
+ public static final long DEFAULT_RETRY_DELAY = 2000;
+ public static final int DEFAULT_MAX_PARALLEL = 15;
+
+ public static final String PROP_NAME = "queue.name";
+ public static final String PROP_TYPE = "queue.type";
+ public static final String PROP_TOPICS = "queue.topics";
+ public static final String PROP_MAX_PARALLEL = "queue.maxparallel";
+ public static final String PROP_RETRIES = "queue.retries";
+ public static final String PROP_RETRY_DELAY = "queue.retrydelay";
+ public static final String PROP_PRIORITY = "queue.priority";
+ public static final String PROP_RUN_LOCAL = "queue.runlocal";
+ public static final String PROP_APP_IDS = "queue.applicationids";
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.PropertyUnbounded;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.OsgiUtil;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.osgi.framework.Constants;
+import org.osgi.service.event.Event;
+
+@Component(metatype=true,name="org.apache.sling.event.jobs.QueueConfiguration",
+ label="%queue.name", description="%queue.description",
+ configurationFactory=true,policy=ConfigurationPolicy.REQUIRE)
+@Service(value=InternalQueueConfiguration.class)
+@Properties({
+ @Property(name=ConfigurationConstants.PROP_NAME),
+ @Property(name=ConfigurationConstants.PROP_TYPE,
+ value=ConfigurationConstants.DEFAULT_TYPE,
+ options={@PropertyOption(name="UNORDERED",value="Parallel"),
+ @PropertyOption(name="ORDERED",value="Ordered"),
+ @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin"),
+ @PropertyOption(name="IGNORE",value="Ignore")}),
+ @Property(name=ConfigurationConstants.PROP_TOPICS,
+ unbounded=PropertyUnbounded.ARRAY),
+ @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
+ intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
+ @Property(name=ConfigurationConstants.PROP_RETRIES,
+ intValue=ConfigurationConstants.DEFAULT_RETRIES),
+ @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
+ longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+ @Property(name=ConfigurationConstants.PROP_PRIORITY,
+ value=ConfigurationConstants.DEFAULT_PRIORITY,
+ options={@PropertyOption(name="NORM",value="Norm"),
+ @PropertyOption(name="MIN",value="Min"),
+ @PropertyOption(name="MAX",value="Max")}),
+ @Property(name=ConfigurationConstants.PROP_RUN_LOCAL,
+ boolValue=ConfigurationConstants.DEFAULT_RUN_LOCAL),
+ @Property(name=ConfigurationConstants.PROP_APP_IDS,
+ unbounded=PropertyUnbounded.ARRAY)
+})
+public class InternalQueueConfiguration
+ implements QueueConfiguration {
+
+ /** The name of the queue. */
+ private String name;
+
+ /** The queue type. */
+ private Type type;
+
+ /** Number of retries. */
+ private int retries;
+
+ /** Retry delay. */
+ private long retryDelay;
+
+ /** Local queue? */
+ private boolean runLocal;
+
+ /** Thread priority. */
+ private JobUtil.JobPriority priority;
+
+ /** The maximum number of parallel processes (for non ordered queues) */
+ private int maxParallelProcesses;
+
+ /** Optional application ids where this queue is running on. */
+ private String[] applicationIds;
+
+ /** The ordering. */
+ private int serviceRanking;
+
+ /** The matchers for topics. */
+ private Matcher[] matchers;
+
+ /** The configured topics. */
+ private String[] topics;
+
+ /** Valid flag. */
+ private boolean valid = false;
+
+ private String pid;
+
+ /**
+ * Create a new configuration from a config
+ */
+ public static InternalQueueConfiguration fromConfiguration(final Map<String, Object> params) {
+ final InternalQueueConfiguration c = new InternalQueueConfiguration();
+ c.activate(params);
+ return c;
+ }
+
+ public InternalQueueConfiguration() {
+ // nothing to do, see activate
+ }
+
+ /**
+ * Create a new queue configuration
+ */
+ @Activate
+ protected void activate(final Map<String, Object> params) {
+ this.name = OsgiUtil.toString(params.get(ConfigurationConstants.PROP_NAME), null);
+ this.priority = JobUtil.JobPriority.valueOf(OsgiUtil.toString(params.get(ConfigurationConstants.PROP_PRIORITY), ConfigurationConstants.DEFAULT_PRIORITY));
+ this.type = Type.valueOf(OsgiUtil.toString(params.get(ConfigurationConstants.PROP_TYPE), ConfigurationConstants.DEFAULT_TYPE));
+ this.runLocal = OsgiUtil.toBoolean(params.get(ConfigurationConstants.PROP_RUN_LOCAL), ConfigurationConstants.DEFAULT_RUN_LOCAL);
+ this.retries = OsgiUtil.toInteger(params.get(ConfigurationConstants.PROP_RETRIES), ConfigurationConstants.DEFAULT_RETRIES);
+ this.retryDelay = OsgiUtil.toLong(params.get(ConfigurationConstants.PROP_RETRY_DELAY), ConfigurationConstants.DEFAULT_RETRY_DELAY);
+ final int maxParallel = OsgiUtil.toInteger(params.get(ConfigurationConstants.PROP_MAX_PARALLEL), ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+ this.maxParallelProcesses = (maxParallel == -1 ? ConfigurationConstants.NUMBER_OF_PROCESSORS : maxParallel);
+ final String appIds[] = OsgiUtil.toStringArray(params.get(ConfigurationConstants.PROP_APP_IDS));
+ if ( appIds == null
+ || appIds.length == 0
+ || (appIds.length == 1 && (appIds[0] == null || appIds[0].length() == 0)) ) {
+ this.applicationIds = null;
+ } else {
+ this.applicationIds = appIds;
+ }
+ final String[] topics = OsgiUtil.toStringArray(params.get(ConfigurationConstants.PROP_TOPICS));
+ if ( topics == null
+ || topics.length == 0
+ || (topics.length == 1 && (topics[0] == null || topics[0].length() == 0))) {
+ matchers = null;
+ this.topics = null;
+ } else {
+ final Matcher[] newMatchers = new Matcher[topics.length];
+ for(int i=0; i < topics.length; i++) {
+ String value = topics[i];
+ if ( value != null ) {
+ value = value.trim();
+ }
+ if ( value != null && value.length() > 0 ) {
+ if ( value.endsWith(".") ) {
+ newMatchers[i] = new PackageMatcher(value.substring(0, value.length() - 1));
+ } else if ( value.endsWith("*") ) {
+ newMatchers[i] = new SubPackageMatcher(value.substring(0, value.length() - 1));
+ } else {
+ newMatchers[i] = new ClassMatcher(value);
+ }
+ }
+ }
+ matchers = newMatchers;
+ this.topics = topics;
+ }
+ this.serviceRanking = OsgiUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
+ this.pid = (String)params.get(Constants.SERVICE_PID);
+ this.valid = this.checkIsValid();
+ }
+
+ public InternalQueueConfiguration(final Event jobEvent) {
+ this.name = (String)jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
+ if ( jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_ORDERED) != null ) {
+ this.type = Type.ORDERED;
+ this.maxParallelProcesses = 1;
+ } else {
+ this.type = Type.UNORDERED;
+ int maxPar = ConfigurationConstants.DEFAULT_MAX_PARALLEL;
+ final Object value = jobEvent.getProperty(JobUtil.PROPERTY_JOB_PARALLEL);
+ if ( value != null ) {
+ if ( value instanceof Boolean ) {
+ final boolean result = ((Boolean)value).booleanValue();
+ if ( !result ) {
+ maxPar = 1;
+ }
+ } else if ( value instanceof Number ) {
+ final int result = ((Number)value).intValue();
+ if ( result > 1 ) {
+ maxPar = result;
+ } else {
+ maxPar = 1;
+ }
+ } else {
+ final String strValue = value.toString();
+ if ( "no".equalsIgnoreCase(strValue) || "false".equalsIgnoreCase(strValue) ) {
+ maxPar = 1;
+ } else {
+ // check if this is a number
+ try {
+ final int result = Integer.valueOf(strValue).intValue();
+ if ( result > 1 ) {
+ maxPar = result;
+ } else {
+ maxPar = 1;
+ }
+ } catch (NumberFormatException ne) {
+ // we ignore this
+ }
+ }
+ }
+ }
+ if ( maxPar == -1 ) {
+ maxPar = ConfigurationConstants.NUMBER_OF_PROCESSORS;
+ }
+ this.maxParallelProcesses = maxPar;
+ }
+ this.priority = JobUtil.JobPriority.valueOf(ConfigurationConstants.DEFAULT_PRIORITY);
+ this.runLocal = false;
+ this.retries = ConfigurationConstants.DEFAULT_RETRIES;
+ this.retryDelay = ConfigurationConstants.DEFAULT_RETRY_DELAY;
+ this.serviceRanking = 0;
+ this.applicationIds = null;
+ this.matchers = null;
+ this.topics = new String[] {"<Custom:" + jobEvent.getProperty(JobUtil.PROPERTY_JOB_TOPIC) + ">"};
+ this.valid = true;
+ }
+
+ /**
+ * Check if this configuration is valid,
+ * If it is invalid, it is ignored.
+ */
+ private boolean checkIsValid() {
+ boolean hasMatchers = false;
+ if ( this.matchers != null ) {
+ for(final Matcher m : this.matchers ) {
+ if ( m != null ) {
+ hasMatchers = true;
+ }
+ }
+ }
+ if ( !hasMatchers ) {
+ return false;
+ }
+ if ( name == null || name.length() == 0 ) {
+ return false;
+ }
+ if ( retries < 0 ) {
+ return false;
+ }
+ if ( type == Type.UNORDERED || type == Type.TOPIC_ROUND_ROBIN ) {
+ if ( maxParallelProcesses < 1 ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean isValid() {
+ return this.valid;
+ }
+
+ /**
+ * Check if the queue processes the event.
+ * @param event The event
+ */
+ public boolean match(final JobEvent event) {
+ final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ if ( this.matchers != null ) {
+ for(final Matcher m : this.matchers ) {
+ if ( m != null ) {
+ final String rep = m.match(topic);
+ if ( rep != null ) {
+ final String name = this.name.replace("{0}", rep);
+ event.queueName = name;
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Return the name of the queue.
+ */
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * Checks if the event should be skipped.
+ * This can happen if
+ * - the queue is of type ignore
+ * - the queue is bound to some application id
+ * - the event is a local event generated with a different application id
+ */
+ public boolean isSkipped(final JobEvent event) {
+ if ( this.type == Type.IGNORE ) {
+ return true;
+ }
+ if ( this.applicationIds != null ) {
+ boolean found = false;
+ for(final String id : this.applicationIds) {
+ if ( Environment.APPLICATION_ID.equals(id) ) {
+ found = true;
+ }
+ }
+ if ( !found ) {
+ return true;
+ }
+ }
+ if ( this.runLocal
+ && !event.event.getProperty(EventUtil.PROPERTY_APPLICATION).equals(Environment.APPLICATION_ID) ) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getRetryDelayInMs()
+ */
+ public long getRetryDelayInMs() {
+ return this.retryDelay;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getMaxRetries()
+ */
+ public int getMaxRetries() {
+ return this.retries;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getType()
+ */
+ public Type getType() {
+ return this.type;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getPriority()
+ */
+ public JobUtil.JobPriority getPriority() {
+ return this.priority;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getMaxParallel()
+ */
+ public int getMaxParallel() {
+ return this.maxParallelProcesses;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#isLocalQueue()
+ */
+ public boolean isLocalQueue() {
+ return this.runLocal;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getApplicationIds()
+ */
+ public String[] getApplicationIds() {
+ return this.applicationIds;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getTopics()
+ */
+ public String[] getTopics() {
+ return this.topics;
+ }
+
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#getRanking()
+ */
+ public int getRanking() {
+ return this.serviceRanking;
+ }
+
+ public String getPid() {
+ return this.pid;
+ }
+
+ @Override
+ public String toString() {
+ return "Queue-Configuration(" + this.hashCode() + ") : {" +
+ "name=" + this.name +
+ ", type=" + this.type +
+ ", topics=" + (this.matchers == null ? "[]" : Arrays.toString(this.matchers)) +
+ ", maxParallelProcesses=" + this.maxParallelProcesses +
+ ", retries=" + this.retries +
+ ", retryDelayInMs= " + this.retryDelay +
+ ", applicationIds= " + (this.applicationIds == null ? "[]" : Arrays.toString(this.applicationIds)) +
+ ", serviceRanking=" + this.serviceRanking +
+ ", pid=" + this.pid +
+ ", isValid=" + this.isValid() + "}";
+ }
+
+ private static interface Matcher {
+ String match(String topic);
+ }
+ private static final class PackageMatcher implements Matcher {
+ private final String packageName;
+
+ public PackageMatcher(final String name) {
+ this.packageName = name;
+ }
+ public String match(final String topic) {
+ final int pos = topic.lastIndexOf('/');
+ return pos > -1 && topic.substring(0, pos).equals(packageName) ? topic.substring(pos + 1) : null;
+ }
+ }
+ private static final class SubPackageMatcher implements Matcher {
+ private final String packageName;
+
+ public SubPackageMatcher(final String name) {
+ this.packageName = name + '/';
+ }
+ public String match(final String topic) {
+ final int pos = topic.lastIndexOf('/');
+ return pos > -1 && topic.substring(0, pos + 1).startsWith(this.packageName) ? topic.substring(this.packageName.length()) : null;
+ }
+ }
+ private static final class ClassMatcher implements Matcher {
+ private final String className;
+
+ public ClassMatcher(final String name) {
+ this.className = name;
+ }
+ public String match(String topic) {
+ return this.className.equals(topic) ? "" : null;
+ }
+ }
+
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.scr.annotations.Services;
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.framework.BundleContext;
+import org.osgi.util.tracker.ServiceTracker;
+
+
+/**
+ * An event handler for special job events.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component
+@Services({
+ @Service(value=QueueConfigurationManager.class)
+})
+public class QueueConfigurationManager {
+
+ /** Configurations - ordered by service ranking. */
+ private volatile InternalQueueConfiguration[] orderedConfigs = new InternalQueueConfiguration[0];
+
+ /** Service tracker for the configurations. */
+ private ServiceTracker configTracker;
+
+ /** Tracker count to detect changes. */
+ private volatile int lastTrackerCount = -1;
+
+
+ /**
+ * Activate this component.
+ * Create the service tracker and start it.
+ */
+ @Activate
+ protected void activate(final BundleContext bundleContext) {
+ this.configTracker = new ServiceTracker(bundleContext,
+ InternalQueueConfiguration.class.getName(), null);
+ this.configTracker.open();
+ }
+
+ /**
+ * Deactivate this component.
+ * Stop the service tracker.
+ */
+ @Deactivate
+ protected void deactivate() {
+ if ( this.configTracker != null ) {
+ this.configTracker.close();
+ this.configTracker = null;
+ }
+ }
+
+ /**
+ * Return all configurations.
+ */
+ public InternalQueueConfiguration[] getConfigurations() {
+ final int count = this.configTracker.getTrackingCount();
+ InternalQueueConfiguration[] configurations = this.orderedConfigs;
+ if ( this.lastTrackerCount < count ) {
+ synchronized ( this ) {
+ configurations = this.orderedConfigs;
+ if ( this.lastTrackerCount < count ) {
+ final Object[] trackedConfigs = this.configTracker.getServices();
+ if ( trackedConfigs == null || trackedConfigs.length == 0 ) {
+ configurations = new InternalQueueConfiguration[0];
+ } else {
+ configurations = new InternalQueueConfiguration[trackedConfigs.length];
+ int i = 0;
+ for(final Object entry : trackedConfigs) {
+ final InternalQueueConfiguration config = (InternalQueueConfiguration)entry;
+ configurations[i] = config;
+ i++;
+ }
+ }
+ this.orderedConfigs = configurations;
+ this.lastTrackerCount = count;
+ }
+ }
+ }
+ return configurations;
+ }
+
+ /**
+ * Find the queue configuration for the job.
+ * This method only returns a configuration if one matches.
+ */
+ public InternalQueueConfiguration getQueueConfiguration(final JobEvent event) {
+ final InternalQueueConfiguration[] configurations = this.getConfigurations();
+ // check for queue name first
+ final String queueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
+ for(final InternalQueueConfiguration config : configurations) {
+ if ( config.isValid() ) {
+ if ( queueName != null ) {
+ if ( queueName.equals(config.getName()) ) {
+ event.queueName = queueName;
+ return config;
+ }
+ } else {
+ if ( config.match(event) ) {
+ return config;
+ }
+ }
+ }
+ }
+ return null;
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.console;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Date;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.impl.jobs.DefaultJobManager;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.Statistics;
+
+/**
+ * This is a webconsole plugin displaying the active queues, some statistics
+ * and the configurations.
+ * @since 3.0
+ */
+@Component
+@Service(value=javax.servlet.Servlet.class)
+@Properties({
+ @Property(name="felix.webconsole.label", value="slingevent", propertyPrivate=true),
+ @Property(name="felix.webconsole.title", value="Sling Eventing", propertyPrivate=true)
+})
+public class WebConsolePlugin extends HttpServlet {
+
+ private static final long serialVersionUID = -6983227434841706385L;
+
+ @Reference
+ private JobManager jobManager;
+
+ @Reference
+ private QueueConfigurationManager queueConfigManager;
+
+ /** Escape the output for html. */
+ private String escape(final String text) {
+ if ( text == null ) {
+ return "";
+ }
+ return text.replace("&", "&").replace("<", "<").replace(">", ">");
+ }
+
+ private Queue getQueue(final HttpServletRequest req) throws ServletException {
+ final String name = req.getParameter("queue");
+ if ( name != null ) {
+ for(final Queue q : this.jobManager.getQueues()) {
+ if ( name.equals(q.getName()) ) {
+ return q;
+ }
+ }
+ }
+ throw new ServletException("Wrong parameters");
+ }
+
+ @Override
+ protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
+ throws ServletException, IOException {
+ final String cmd = req.getParameter("action");
+ if ( "suspend".equals(cmd) ) {
+ final Queue q = this.getQueue(req);
+ q.suspend();
+ } else if ( "resume".equals(cmd) ) {
+ final Queue q = this.getQueue(req);
+ q.resume();
+ } else if ( "clear".equals(cmd) ) {
+ final Queue q = this.getQueue(req);
+ q.clear();
+ } else if ( "dropall".equals(cmd) ) {
+ final Queue q = this.getQueue(req);
+ q.removeAll();
+ } else {
+ throw new ServletException("Unknown command");
+ }
+ resp.sendRedirect(req.getContextPath() + req.getServletPath() + req.getPathInfo());
+ }
+
+ @Override
+ protected void doGet(final HttpServletRequest req, final HttpServletResponse res)
+ throws ServletException, IOException {
+ final PrintWriter pw = res.getWriter();
+
+ pw.println("<p class='statline ui-state-highlight'>Apache Sling Eventing</p>");
+
+ pw.println("<table class='nicetable'><tbody>");
+ Statistics s = this.jobManager.getStatistics();
+ pw.println("<tr><th colspan='2'>Overall Statistics</th></tr>");
+ pw.printf("<tr><td>Start Time</td><td>%s</td></tr>", formatDate(s.getStartTime()));
+ pw.printf("<tr><td>Last Activated</td><td>%s</td></tr>", formatDate(s.getLastActivatedJobTime()));
+ pw.printf("<tr><td>Last Finished</td><td>%s</td></tr>", formatDate(s.getLastFinishedJobTime()));
+ pw.printf("<tr><td>Queued Jobs</td><td>%s</td></tr>", s.getNumberOfQueuedJobs());
+ pw.printf("<tr><td>Active Jobs</td><td>%s</td></tr>", s.getNumberOfActiveJobs());
+ pw.printf("<tr><td>Jobs</td><td>%s</td></tr>", s.getNumberOfJobs());
+ pw.printf("<tr><td>Finished Jobs</td><td>%s</td></tr>", s.getNumberOfFinishedJobs());
+ pw.printf("<tr><td>Failed Jobs</td><td>%s</td></tr>", s.getNumberOfFailedJobs());
+ pw.printf("<tr><td>Cancelled Jobs</td><td>%s</td></tr>", s.getNumberOfCancelledJobs());
+ pw.printf("<tr><td>Processed Jobs</td><td>%s</td></tr>", s.getNumberOfProcessedJobs());
+ pw.printf("<tr><td>Average Processing Time</td><td>%s</td></tr>", formatTime(s.getAverageProcessingTime()));
+ pw.printf("<tr><td>Average Waiting Time</td><td>%s</td></tr>", formatTime(s.getAverageWaitingTime()));
+ pw.println("</tbody></table>");
+ pw.println("<br/>");
+
+ boolean isEmpty = true;
+ for(final Queue q : this.jobManager.getQueues()) {
+ isEmpty = false;
+ pw.println("<div class='ui-widget-header ui-corner-top buttonGroup'>");
+ pw.printf("<span style='float: left; margin-left: 1em'>Active JobQueue: %s %s</span>", escape(q.getName()),
+ q.isSuspended() ? "(SUSPENDED)" : "");
+ if ( q.isSuspended() ) {
+ this.printForm(pw, q, "Resume", "resume");
+ } else {
+ this.printForm(pw, q, "Suspend", "suspend");
+ }
+ this.printForm(pw, q, "Clear", "clear");
+ this.printForm(pw, q, "Drop All", "dropall");
+ pw.println("</div>");
+ pw.println("<table class='nicetable'><tbody>");
+
+ s = q.getStatistics();
+ final QueueConfiguration c = q.getConfiguration();
+ pw.println("<tr><th colspan='2'>Statistics</th><th colspan='2'>Configuration</th></tr>");
+ pw.printf("<tr><td>Start Time</td><td>%s</td><td>Type</td><td>%s</td></tr>", formatDate(s.getStartTime()), c.getType());
+ pw.printf("<tr><td>Last Activated</td><td>%s</td><td>Topics</td><td>%s</td></tr>", formatDate(s.getLastActivatedJobTime()), formatArray(c.getTopics()));
+ pw.printf("<tr><td>Last Finished</td><td>%s</td><td>Max Parallel</td><td>%s</td></tr>", formatDate(s.getLastFinishedJobTime()), c.getMaxParallel());
+ pw.printf("<tr><td>Queued Jobs</td><td>%s</td><td>Max Retries</td><td>%s</td></tr>", s.getNumberOfQueuedJobs(), c.getMaxRetries());
+ pw.printf("<tr><td>Active Jobs</td><td>%s</td><td>Retry Delay</td><td>%s ms</td></tr>", s.getNumberOfActiveJobs(), c.getRetryDelayInMs());
+ pw.printf("<tr><td>Jobs</td><td>%s</td><td>Priority</td><td>%s</td></tr>", s.getNumberOfJobs(), c.getPriority());
+ pw.printf("<tr><td>Finished Jobs</td><td>%s</td><td>Run Local</td><td>%s</td></tr>", s.getNumberOfFinishedJobs(), c.isLocalQueue());
+ pw.printf("<tr><td>Failed Jobs</td><td>%s</td><td>App Ids</td><td>%s</td></tr>", s.getNumberOfFailedJobs(), formatArray(c.getApplicationIds()));
+ pw.printf("<tr><td>Cancelled Jobs</td><td>%s</td><td colspan='2'> </td></tr>", s.getNumberOfCancelledJobs());
+ pw.printf("<tr><td>Processed Jobs</td><td>%s</td><td colspan='2'> </td></tr>", s.getNumberOfProcessedJobs());
+ pw.printf("<tr><td>Average Processing Time</td><td>%s</td><td colspan='2'> </td></tr>", formatTime(s.getAverageProcessingTime()));
+ pw.printf("<tr><td>Average Waiting Time</td><td>%s</td><td colspan='2'> </td></tr>", formatTime(s.getAverageWaitingTime()));
+ pw.printf("<tr><td>Status Info</td><td>%s</td></tr>", escape(q.getStatusInfo()));
+ pw.println("</tbody></table>");
+ pw.println("<br/>");
+ }
+ if ( isEmpty ) {
+ pw.println("<p>No active queues.</p>");
+ pw.println("<br/>");
+ }
+
+ pw.println("<p class='statline'>Apache Sling Eventing - Job Queue Configurations</p>");
+ this.printQueueConfiguration(req, pw, ((DefaultJobManager)this.jobManager).getMainQueueConfiguration());
+ final InternalQueueConfiguration[] configs = this.queueConfigManager.getConfigurations();
+ for(final InternalQueueConfiguration c : configs ) {
+ this.printQueueConfiguration(req, pw, c);
+ }
+ }
+
+ private void printQueueConfiguration(final HttpServletRequest req, final PrintWriter pw, final InternalQueueConfiguration c) {
+ pw.println("<div class='ui-widget-header ui-corner-top buttonGroup'>");
+ pw.printf("<span style='float: left; margin-left: 1em'>Job Queue Configuration: %s</span>%n",
+ escape(c.getName()));
+ pw.printf("<button id='edit' class='ui-state-default ui-corner-all' onclick='javascript:window.location=\"%s%s/configMgr/%s\";'>Edit</button>",
+ req.getContextPath(), req.getServletPath(), c.getPid());
+ pw.println("</div>");
+ pw.println("<table class='nicetable'><tbody>");
+ pw.println("<tr><th colspan='2'>Configuration</th></tr>");
+ pw.printf("<tr><td>Valid</td><td>%s</td></tr>", c.isValid());
+ pw.printf("<tr><td>Type</td><td>%s</td></tr>", c.getType());
+ pw.printf("<tr><td>Topics</td><td>%s</td></tr>", formatArray(c.getTopics()));
+ pw.printf("<tr><td>Max Parallel</td><td>%s</td></tr>", c.getMaxParallel());
+ pw.printf("<tr><td>Max Retries</td><td>%s</td></tr>", c.getMaxRetries());
+ pw.printf("<tr><td>Retry Delay</td><td>%s ms</td></tr>", c.getRetryDelayInMs());
+ pw.printf("<tr><td>Priority</td><td>%s ms</td></tr>", c.getPriority());
+ pw.printf("<tr><td>Run Local</td><td>%s ms</td></tr>", c.isLocalQueue());
+ pw.printf("<tr><td>App Ids</td><td>%s ms</td></tr>", formatArray(c.getApplicationIds()));
+ pw.printf("<tr><td>Ranking</td><td>%s ms</td></tr>", c.getRanking());
+
+ pw.println("</tbody></table>");
+ pw.println("<br/>");
+ }
+
+ /** TODO */
+ private String formatArray(final String[] array) {
+ if ( array == null || array.length == 0 ) {
+ return "";
+ }
+ return escape(Arrays.toString(array));
+ }
+
+ /** TODO */
+ private String formatDate(final long time) {
+ if ( time == -1 ) {
+ return "-";
+ }
+ final Date d = new Date(time);
+ return d.toString();
+ }
+
+ /** TODO */
+ private String formatTime(final long time) {
+ if ( time == 0 ) {
+ return "-";
+ }
+ if ( time < 1000 ) {
+ return time + " ms";
+ } else if ( time < 1000 * 60 ) {
+ return time / 1000 + " secs";
+ }
+ final long min = time / 1000 / 60;
+ final long secs = (time - min * 1000 * 60);
+ return min + " min " + secs / 1000 + " secs";
+ }
+
+ private void printForm(final PrintWriter pw,
+ final Queue q,
+ final String buttonLabel,
+ final String hiddenValue) {
+ pw.printf("<form method='POST' name='%s'><input type='hidden' name='action' value='%s'/>"+
+ "<input type='hidden' name='queue' value='%s'/>" +
+ "<button class='ui-state-default ui-corner-all' onclick='javascript:document.forms[\"%s\"].submit();'>" +
+ "%s</button></form>", hiddenValue, hiddenValue, q.getName(), hiddenValue, buttonLabel);
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.PropertyIterator;
+import javax.jcr.PropertyType;
+import javax.jcr.RepositoryException;
+import javax.jcr.Value;
+import javax.jcr.ValueFactory;
+
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.jobs.JobStatusNotifier;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+
+
+/**
+ * Helper class defining some constants and utility methods.
+ */
+public abstract class JCRHelper {
+
+ /** The namespace prefix. */
+ public static final String EVENT_PREFIX = "slingevent:";
+
+ public static final String NODE_PROPERTY_TOPIC = "slingevent:topic";
+ public static final String NODE_PROPERTY_APPLICATION = "slingevent:application";
+ public static final String NODE_PROPERTY_CREATED = "slingevent:created";
+ public static final String NODE_PROPERTY_PROPERTIES = "slingevent:properties";
+ public static final String NODE_PROPERTY_PROCESSOR = "slingevent:processor";
+ public static final String NODE_PROPERTY_JOBID = "slingevent:id";
+ public static final String NODE_PROPERTY_FINISHED = "slingevent:finished";
+ public static final String NODE_PROPERTY_TE_EXPRESSION = "slingevent:expression";
+ public static final String NODE_PROPERTY_TE_DATE = "slingevent:date";
+ public static final String NODE_PROPERTY_TE_PERIOD = "slingevent:period";
+
+ public static final String EVENT_NODE_TYPE = "slingevent:Event";
+ public static final String JOB_NODE_TYPE = "slingevent:Job";
+ public static final String TIMED_EVENT_NODE_TYPE = "slingevent:TimedEvent";
+
+ /** The nodetype for newly created intermediate folders */
+ public static final String NODETYPE_FOLDER = "sling:Folder";
+
+ /** The nodetype for newly created folders */
+ public static final String NODETYPE_ORDERED_FOLDER = "sling:OrderedFolder";
+
+
+ /** List of ignored properties to write to the repository. */
+ private static final String[] IGNORE_PROPERTIES = new String[] {
+ EventUtil.PROPERTY_DISTRIBUTE,
+ EventUtil.PROPERTY_APPLICATION,
+ JobUtil.JOB_ID,
+ JobStatusNotifier.CONTEXT_PROPERTY_NAME
+ };
+
+ /** List of ignored prefixes to read from the repository. */
+ private static final String[] IGNORE_PREFIXES = new String[] {
+ JCRHelper.EVENT_PREFIX
+ };
+
+ /**
+ * Check if this property should be ignored
+ */
+ private static boolean ignoreProperty(final String name) {
+ for(final String prop : IGNORE_PROPERTIES) {
+ if ( prop.equals(name) ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Add all java properties as properties to the node.
+ * If the name and the value of a map entry can easily converted into
+ * a repository property, it is directly added. All other java
+ * properties are stored in one binary property.
+ *
+ * @param node The node where all properties are added to
+ * @param event The event.
+ * @throws RepositoryException
+ */
+ public static void writeEventProperties(final Node node,
+ final Event event)
+ throws RepositoryException {
+ if ( event != null ) {
+ final String[] propNames = event.getPropertyNames();
+ if ( propNames != null && propNames.length > 0 ) {
+ // check which props we can write directly and
+ // which we need to write as a binary blob
+ final List<String> propsAsBlob = new ArrayList<String>();
+
+ for(final String name : propNames) {
+
+ if ( !ignoreProperty(name) ) {
+ // sanity check
+ final Object value = event.getProperty(name);
+ if ( value != null ) {
+ if ( !setProperty(name, value, node) ) {
+ propsAsBlob.add(name);
+ }
+ }
+ }
+ }
+ // write the remaining properties as a blob
+ if ( propsAsBlob.size() > 0 ) {
+ try {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeInt(propsAsBlob.size());
+ for(final String propName : propsAsBlob) {
+ oos.writeObject(propName);
+ try {
+ oos.writeObject(event.getProperty(propName));
+ } catch (IOException ioe) {
+ throw new RepositoryException("Unable to serialize property " + propName, ioe);
+ }
+ }
+ oos.close();
+ node.setProperty(JCRHelper.NODE_PROPERTY_PROPERTIES,
+ node.getSession().getValueFactory().createBinary(new ByteArrayInputStream(baos.toByteArray())));
+ } catch (IOException ioe) {
+ throw new RepositoryException("Unable to serialize event " + EventUtil.toString(event), ioe);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Read event properties from a repository node and create a property map (dictionary).
+ * As the properties might contain serialized java objects, a class loader can be specified
+ * for loading classes of the serialized java objects.
+ * @throws RepositoryException
+ * @throws ClassNotFoundException
+ */
+ public static Dictionary<String, Object> readEventProperties(final Node node,
+ final ClassLoader objectClassLoader,
+ final boolean forceLoad)
+ throws RepositoryException, ClassNotFoundException {
+ final Dictionary<String, Object> properties = new Hashtable<String, Object>();
+
+ // check the properties blob
+ if ( node.hasProperty(JCRHelper.NODE_PROPERTY_PROPERTIES) ) {
+ try {
+ final ObjectInputStream ois = new ObjectInputStream(node.getProperty(JCRHelper.NODE_PROPERTY_PROPERTIES).getBinary().getStream(),
+ objectClassLoader);
+ int length = ois.readInt();
+ for(int i=0;i<length;i++) {
+ final String key = (String)ois.readObject();
+ final Object value = ois.readObject();
+ properties.put(key, value);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ if ( !forceLoad ) {
+ throw cnfe;
+ }
+ } catch (java.io.InvalidClassException ice) {
+ if ( !forceLoad ) {
+ throw new ClassNotFoundException("Found invalid class.", ice);
+ }
+ } catch (IOException ioe) {
+ if ( !forceLoad ) {
+ throw new RepositoryException("Unable to deserialize event properties.", ioe);
+ }
+ }
+ }
+ // now all properties that have been set directly
+ final PropertyIterator pI = node.getProperties();
+ while ( pI.hasNext() ) {
+ final Property p = pI.nextProperty();
+ boolean ignore = p.getName().startsWith("jcr:");
+ if ( !ignore) {
+ int index = 0;
+ while ( !ignore && index < IGNORE_PREFIXES.length ) {
+ ignore = p.getName().startsWith(IGNORE_PREFIXES[index]);
+ index++;
+ }
+ }
+ if ( !ignore ) {
+ final String name = ISO9075.decode(p.getName());
+ if ( p.getDefinition().isMultiple() ) {
+ final Value[] values = p.getValues();
+ if ( values.length > 0 ) {
+ // get first value
+ final Object firstObject = getPropertyValue(values[0]);
+ final Object[] array;
+ if ( firstObject instanceof Boolean ) {
+ array = new Boolean[values.length];
+ } else if ( firstObject instanceof Calendar ) {
+ array = new Calendar[values.length];
+ } else if ( firstObject instanceof Double ) {
+ array = new Double[values.length];
+ } else if ( firstObject instanceof Long ) {
+ array = new Long[values.length];
+ } else if ( firstObject instanceof BigDecimal) {
+ array = new BigDecimal[values.length];
+ } else {
+ array = new String[values.length];
+ }
+ array[0] = firstObject;
+ int index = 1;
+ while ( index < values.length ) {
+ array[index] = getPropertyValue(values[index]);
+ index++;
+ }
+ properties.put(name, array);
+ }
+ } else {
+ final Value value = p.getValue();
+ final Object o = getPropertyValue(value);
+ properties.put(name, o);
+ }
+ }
+ }
+ return properties;
+ }
+
+ /**
+ * Return the converted repository property name
+ * @param name The java object property name
+ * @return The converted name or null if not possible.
+ */
+ public static String getNodePropertyName(final String name) {
+ // if name contains a colon, we can't set it as a property
+ if ( name.indexOf(':') != -1 ) {
+ return null;
+ }
+ return ISO9075.encode(name);
+ }
+
+ /**
+ * Return the converted repository property value
+ * @param valueFactory The value factory
+ * @param eventValue The event value
+ * @return The converted value or null if not possible
+ */
+ public static Value getNodePropertyValue(final ValueFactory valueFactory, final Object eventValue) {
+ final Value val;
+ if (eventValue instanceof Calendar) {
+ val = valueFactory.createValue((Calendar)eventValue);
+ } else if (eventValue instanceof Long) {
+ val = valueFactory.createValue((Long)eventValue);
+ } else if (eventValue instanceof Double) {
+ val = valueFactory.createValue(((Double)eventValue).doubleValue());
+ } else if (eventValue instanceof Boolean) {
+ val = valueFactory.createValue((Boolean) eventValue);
+ } else if (eventValue instanceof BigDecimal) {
+ val = valueFactory.createValue((BigDecimal) eventValue);
+ } else if (eventValue instanceof String) {
+ val = valueFactory.createValue((String)eventValue);
+ } else {
+ val = null;
+ }
+ return val;
+ }
+
+ /**
+ * Convert the value back to an object.
+ * @param value
+ * @return
+ * @throws RepositoryException
+ */
+ private static Object getPropertyValue(final Value value)
+ throws RepositoryException {
+ final Object o;
+ switch (value.getType()) {
+ case PropertyType.BOOLEAN:
+ o = value.getBoolean(); break;
+ case PropertyType.DATE:
+ o = value.getDate(); break;
+ case PropertyType.DOUBLE:
+ o = value.getDouble(); break;
+ case PropertyType.LONG:
+ o = value.getLong(); break;
+ case PropertyType.STRING:
+ o = value.getString(); break;
+ case PropertyType.DECIMAL:
+ o = value.getDecimal(); break;
+ default: // this should never happen - we convert to a string...
+ o = value.getString();
+ }
+ return o;
+ }
+
+ /**
+ * Try to set the java property as a property of the node.
+ * @param name
+ * @param value
+ * @param node
+ * @return
+ * @throws RepositoryException
+ */
+ private static boolean setProperty(String name, Object value, Node node)
+ throws RepositoryException {
+ final String propName = getNodePropertyName(name);
+ if ( propName == null ) {
+ return false;
+ }
+ final ValueFactory fac = node.getSession().getValueFactory();
+ // check for multi value
+ if ( value.getClass().isArray() ) {
+ final Object[] array = (Object[])value;
+ // now we try to convert each value
+ // and check if all converted values have the same type
+ final Value[] values = new Value[array.length];
+ int index = 0;
+ for(final Object v : array ) {
+ values[index] = getNodePropertyValue(fac, v);
+ if ( values[index] == null ) {
+ return false;
+ }
+ if ( index > 0 && !values[index-1].getClass().equals(values[index].getClass()) ) {
+ return false;
+ }
+ index++;
+ }
+ node.setProperty(propName, values);
+ return true;
+ }
+ final Value val = getNodePropertyValue(fac, value);
+ if ( val != null ) {
+ node.setProperty(propName, val);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This is an extended version of the object input stream which uses the
+ * thread context class loader.
+ */
+ private static class ObjectInputStream extends java.io.ObjectInputStream {
+
+ private ClassLoader classloader;
+
+ public ObjectInputStream(final InputStream in, final ClassLoader classLoader) throws IOException {
+ super(in);
+ this.classloader = classLoader;
+ }
+
+ /**
+ * @see java.io.ObjectInputStream#resolveClass(java.io.ObjectStreamClass)
+ */
+ @Override
+ protected Class<?> resolveClass(java.io.ObjectStreamClass classDesc) throws IOException, ClassNotFoundException {
+ if ( this.classloader != null ) {
+ return Class.forName(classDesc.getName(), true, this.classloader);
+ }
+ return super.resolveClass(classDesc);
+ }
+ }
+}
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRHelper.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
URL: http://svn.apache.org/viewvc/sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java?rev=1021247&view=auto
==============================================================================
--- sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java (added)
+++ sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java Mon Oct 11 06:54:12 2010
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.jcr;
+
+import org.apache.sling.event.impl.jobs.JobEvent;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+
+/**
+ * This object encapsulates all information about a job.
+ */
+public class JCRJobEvent extends JobEvent {
+
+ private final PersistenceHandler handler;
+
+ public JCRJobEvent(final Event e, final PersistenceHandler handler) {
+ super(e, (String)e.getProperty(JobUtil.JOB_ID));
+ this.handler = handler;
+ }
+
+ @Override
+ public boolean lock() {
+ return this.handler.lock(this);
+ }
+
+ @Override
+ public void unlock() {
+ this.handler.unlock(this);
+ }
+
+ @Override
+ public void finished() {
+ this.handler.finished(this);
+ }
+
+ @Override
+ public boolean reschedule() {
+ return this.handler.reschedule(this);
+ }
+
+ @Override
+ public boolean remove() {
+ return this.handler.remove(this.uniqueId);
+ }
+}
\ No newline at end of file
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/branches/eventing-3.0/src/main/java/org/apache/sling/event/impl/jobs/jcr/JCRJobEvent.java
------------------------------------------------------------------------------
svn:mime-type = text/plain