You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2010/02/12 09:58:00 UTC

svn commit: r909303 [2/2] - in /synapse/trunk/java/modules: commons/ commons/src/main/java/org/apache/synapse/commons/evaluators/ commons/src/main/java/org/apache/synapse/commons/evaluators/config/ commons/src/main/java/org/apache/synapse/commons/execu...

Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.commons.executors;
+
+import java.util.List;
+
+/**
+ * This interface implements the algorith for determining the next internal
+ * queue for picking up the message. This class is created onece and initialized.
+ * This class should capture any runtime information about the queues since the
+ * MultiPriorityBlockingQueue doesn't hold any runtime state information about
+ * the queues.
+ *
+ * @param <E>
+ */
+public interface NextQueueAlgorithm<E> {
+
+    /**
+     * Initialized with the queues sorted according to the priority.
+     *
+     * @param queues list of queues
+     */
+    void init(List<InternalQueue<E>> queues);
+
+    /**
+     * Should return a queue based on some selection criteria and current
+     * state of the queues.
+     *
+     * @return the queue
+     */
+    InternalQueue<E> getNextQueue();    
+}

Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,101 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.commons.executors;
+
+import java.util.List;
+
+/**
+ * This is a priority based round robin algorithm for getting the next queue </p>
+ *
+ * <p>This algorithm works in cycles. Lets say we have queues with following priorities.
+ * 7, 5, 2 and assume we name the queues as 1, 2, 3 in the order. </p>
+ * <p>Here is how messages are picked in a single cycle
+ * 1, 1, 1, 1, 1, 1, 1 // all the messages for the queue with priority 1 are sent for this cycle
+ * 2, 2, 2, 2, 2,  // all the messages for the queue with priority 2 are sent for this cycle
+ * 3, 3 // all the messages with priority 2 are sent for this cycle</p>
+ *
+ * <p>This algorithm choose the queues in the above order if all the queues have messages at the
+ * point of selection. If a queue doesn't have messages it will skip the queue and move to the
+ * next. If none of the queues have messages it will return null.
+ */
+public class PRRNextQueueAlgorithm<E> implements NextQueueAlgorithm<E> {
+    
+    /**
+     * We hold a reference to the actual queue
+     */
+    private List<InternalQueue<E>> queues;
+
+    /**
+     * Number of queues, this is just to avoid the overhead of calculating
+     * this again and again
+     */
+    private int size = 0;
+
+    /**
+     * Current queue we are operating on
+     */
+    private int currentQueue = 0;
+
+    /**
+     * Number of messages sent from the current queue
+     */
+    private int currentCount = 0;
+
+    public InternalQueue<E> getNextQueue() {
+        InternalQueue<E> internalQueue = queues.get(currentQueue);
+
+        int priority = internalQueue.getPriority();
+
+        if (priority == currentCount || internalQueue.size() == 0) {
+            currentCount = 0;
+            // we need to move to the next queue not empty
+            int c = 0;
+            do {
+                if (currentQueue == queues.size() - 1) {
+                    currentQueue = 0;
+                } else {
+                    currentQueue++;
+                }
+
+                internalQueue = queues.get(currentQueue);
+
+                c++;
+                // we move forward until we find a non empty queue or everything is empty
+            } while (internalQueue.size() == 0 && c < size);
+
+            if (internalQueue.size() == 0) {
+                currentQueue = 0;
+                return null;
+            }
+        }
+
+        currentCount++;
+
+        /*log.info("Get the queue with the priority: " +
+                        internalQueue.getPriority());*/
+
+        return internalQueue;
+    }
+
+    public void init(List<InternalQueue<E>> queues) {
+        this.queues = queues;
+        size = queues.size();
+    }
+}

Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,282 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.commons.executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.axis2.transport.base.threads.NativeThreadFactory;
+
+import java.util.concurrent.*;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * This is the class used for executing the tasks with a given priority. It is backed by a
+ * BlockingQueue and a ThreadPoolExecutor. The BlockingQueue is a custom implementation which 
+ * has multiple internal queues for handling separate priorities.
+ */
+public class PriorityExecutor {
+    private final Log log = LogFactory.getLog(PriorityExecutor.class);    
+
+    /** Actual thread pool executor */
+    private ThreadPoolExecutor executor;
+
+    /** Name of the executor */
+    private String name = null;
+
+    /** Core threads count */
+    private int core = ExecutorConstants.DEFAULT_CORE;
+    /** Max thread count */
+    private int max = ExecutorConstants.DEFAULT_MAX;
+    /** Keep alive time for spare threads */
+    private int keepAlive = ExecutorConstants.DEFAULT_KEEP_ALIVE;
+    /** This will be executed before the Task is submitted  */
+    private BeforeExecuteHandler beforeExecuteHandler;
+    /** Queue used by the executor */
+    private MultiPriorityBlockingQueue<Runnable> queue;
+
+    /** this is used by the file based synapse xml configuration */
+    private String fileName;
+
+    /**
+     * Execute a given task with the priority specified.
+     *
+     * @param task task to be executed
+     * @param priority priority of the tast
+     */
+    public void execute(final Runnable task, int priority) {
+        Worker w = new Worker(task, priority);
+
+        if (beforeExecuteHandler != null) {
+            beforeExecuteHandler.beforeExecute(w);
+        }
+        // we are capturing all the exceptions to prevent threads from dying
+        executor.execute(w);
+    }
+
+
+    /**
+     * Initialize the executor by using the properties. Create the queues
+     * and ThreadPool executor.
+     */
+    public void init() {
+        if (queue == null) {
+            throw new IllegalStateException("Queue should be specified before initializing");
+        }
+
+        executor = new ThreadPoolExecutor(core, max, keepAlive, TimeUnit.SECONDS, queue,
+                new NativeThreadFactory(new ThreadGroup("executor-group"), "priority-worker"));
+
+        if (log.isDebugEnabled()) {
+            log.debug("Started the thread pool executor with threads, " +
+                    "core = " + core + " max = " + max +
+                    ", keep-alive = " + keepAlive);
+        }
+    }
+
+    /**
+     * Destroy the executor. 
+     */
+    public void destroy() {
+        if (log.isDebugEnabled()) {
+            log.debug("Shutting down thread pool executor");
+        }
+
+        executor.shutdown();
+
+        try {
+            executor.awaitTermination(100, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.error("Failed to Shut down Executor");        
+        }
+    }
+
+    /**
+     * Set the name of the executor
+     *
+     * @param name of the executor
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Get the name of the executor
+     *
+     * @return name of the executor
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Set a handler for execute before putting a worker in to the queues.
+     * User can set some properties to the worker at this point. This
+     * allows users to get more control over the queue selection algorithm.
+     * This is an optional configuration.
+     *
+     * @param beforeExecuteHandler an object implementing the BeforeExecuteHandler 
+     */
+    public void setBeforeExecuteHandler(
+            BeforeExecuteHandler beforeExecuteHandler) {
+        this.beforeExecuteHandler = beforeExecuteHandler;
+    }
+
+    /**
+     * Get the handler that is executed before the worker is put in to the queue
+     *
+     * @return an object of BeforeExecuteHandler 
+     */
+    public BeforeExecuteHandler getBeforeExecuteHandler() {
+        return beforeExecuteHandler;
+    }
+
+    /**
+     * Set the queue
+     *
+     * @param queue queue used for handling the priorities
+     */
+    public void setQueue(MultiPriorityBlockingQueue<Runnable> queue) {
+        this.queue = queue;
+    }
+
+    /**
+     * Get the queue
+     *
+     * @return queue used for handling multiple priorities
+     */
+    public MultiPriorityBlockingQueue<Runnable> getQueue() {
+        return queue;
+    }
+
+    /**
+     * Get the core number of threads
+     *
+     * @return core number of threads
+     */
+    public int getCore() {
+        return core;
+    }
+
+    /**
+     * Get the max threads
+     *
+     * @return max thread
+     */
+    public int getMax() {
+        return max;
+    }
+
+    /**
+     * Get the keep alive time for threads
+     *
+     * @return keep alive time for threads
+     */
+    public int getKeepAlive() {
+        return keepAlive;
+    }
+
+    /**
+     * Set the core number of threads
+     *
+     * @param core core number of threads
+     */
+    public void setCore(int core) {
+        this.core = core;
+    }
+
+    /**
+     * Set the max number of threads
+     *
+     * @param max max threads
+     */
+    public void setMax(int max) {
+        this.max = max;
+    }
+
+    /**
+     * Set the keep alive time for threads
+     *
+     * @param keepAlive keep alive threads
+     */
+    public void setKeepAlive(int keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    /**
+     * Get the file used to store this executor config
+     *
+     * @return file used for storing the config
+     */
+    public String getFileName() {
+        return fileName;
+    }
+
+    /**
+     * Set the file used to store the config
+     *
+     * @param fileName file name
+     */
+    public void setFileName(String fileName) {
+        this.fileName = fileName;
+    }
+
+    /**
+     * Private class for executing the tasks submited. This class is used for
+     * prevent the threads from dying in case of unhandled exceptions. Also
+     * this class implements the Importance for carrying the priority.     
+     */
+    private class Worker implements Runnable, Importance {
+        private Runnable runnable = null;
+
+        private Map<String, Object> properties = new HashMap<String, Object>();
+
+        private int priority = 1;
+
+        private Worker(Runnable runnable, int priority) {
+            this.priority = priority;
+            this.runnable = runnable;
+        }
+
+        public void run() {
+            try {
+                runnable.run();
+            } catch (Throwable e) {
+                log.error("Unhandled exception", e);
+            }
+        }
+
+        public int getPriority() {
+            return priority;
+        }
+
+        public void setPriority(int p) {
+            this.priority = p;
+        }
+
+        public void setProperty(String name, Object value) {
+            properties.put(name, value);
+        }
+
+        public Object getProperty(String name) {
+            return properties.get(name);
+        }
+    }
+}

Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,235 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.commons.executors.config;
+
+import org.apache.synapse.commons.executors.*;
+import org.apache.synapse.commons.executors.queues.FixedSizeQueue;
+import org.apache.synapse.commons.executors.queues.UnboundedQueue;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axis2.AxisFault;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+import javax.xml.namespace.QName;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class PriorityExecutorFactory {
+    private static Log log = LogFactory.getLog(PriorityExecutorFactory.class);
+
+    public static final QName NAME_ATT = new QName(ExecutorConstants.NAME);
+    public static final QName CLASS_ATT = new QName("class");
+    public static final QName ATT_NAME    = new QName(ExecutorConstants.NAME);
+    public static final QName ATT_VALUE   = new QName(ExecutorConstants.VALUE);
+    public static final QName SIZE_ATT   = new QName(ExecutorConstants.SIZE);
+    public static final QName PRIORITY_ATT   = new QName(ExecutorConstants.PRIORITY);
+
+    public static final QName IS_FIXED_ATT = new QName(ExecutorConstants.IS_FIXED_SIZE);
+
+    public static final QName BEFORE_EXECUTE_HANDLER =
+            new QName(ExecutorConstants.BEFORE_EXECUTE_HANDLER);
+
+    public static final QName NEXT_QUEUE_ATT = new QName(ExecutorConstants.NEXT_QUEUE);
+
+    public static final QName MAX_ATT = new QName(ExecutorConstants.MAX);
+    public static final QName CORE_ATT = new QName(ExecutorConstants.CORE);
+    public static final QName KEEP_ALIVE_ATT = new QName(ExecutorConstants.KEEP_ALIVE);
+
+    public static PriorityExecutor createExecutor(String namespace, OMElement e,
+                                                  boolean requireName) throws AxisFault {        
+        QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
+        QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
+
+        QName threadsQName = createQname(namespace, ExecutorConstants.THREADS);
+
+        PriorityExecutor executor = new PriorityExecutor();
+
+        OMAttribute nameAtt = e.getAttribute(NAME_ATT);
+        if (nameAtt != null && !"".equals(nameAtt.getAttributeValue())) {
+            executor.setName(nameAtt.getAttributeValue());
+        } else if (requireName){
+            handlerException(ExecutorConstants.NAME +
+                    " is required for a " + ExecutorConstants.PRIORITY_EXECUTOR);
+        }
+
+        // set the handler for calling before the message is put in to the queue
+        OMAttribute handlerAtt = e.getAttribute(BEFORE_EXECUTE_HANDLER);
+        if (handlerAtt != null) {
+            BeforeExecuteHandler beh =
+                    createExecuteBeforeHandler(handlerAtt.getAttributeValue());
+            executor.setBeforeExecuteHandler(beh);
+        }
+
+        // create the queue configuration
+        OMElement queuesEle = e.getFirstChildWithName(queuesQName);
+        if (queuesEle != null) {
+            OMAttribute nextQueueAtt = queuesEle.getAttribute(NEXT_QUEUE_ATT);
+
+            NextQueueAlgorithm<Runnable> nqa = null;
+            if (nextQueueAtt != null) {
+                 nqa = createNextQueueAlgo(nextQueueAtt.getAttributeValue());
+            }
+
+            boolean isFixedSize = true;
+
+            OMAttribute fixedSizeAtt = queuesEle.getAttribute(IS_FIXED_ATT);
+            if (fixedSizeAtt != null) {
+                isFixedSize = Boolean.parseBoolean(fixedSizeAtt.getAttributeValue());
+            }
+
+            // create the queue configuration
+            List<InternalQueue<Runnable>> intQueues = createQueues(queueQName, queuesEle, isFixedSize);
+
+            MultiPriorityBlockingQueue queue =
+                    new MultiPriorityBlockingQueue(intQueues, isFixedSize, nqa);
+
+            executor.setQueue(queue);
+        } else {
+            handlerException("Queues configuration is mandatory");
+        }
+
+        OMElement threadsEle = e.getFirstChildWithName(threadsQName);
+        if (threadsEle != null) {
+            OMAttribute maxAttr = threadsEle.getAttribute(MAX_ATT);
+            if (maxAttr != null) {
+                executor.setMax(Integer.parseInt(maxAttr.getAttributeValue()));
+            }
+            OMAttribute coreAttr = threadsEle.getAttribute(CORE_ATT);
+            if (coreAttr != null) {
+                executor.setCore(Integer.parseInt(coreAttr.getAttributeValue()));
+            }
+            OMAttribute keepAliveAttr = threadsEle.getAttribute(KEEP_ALIVE_ATT);
+            if (keepAliveAttr != null) {
+                executor.setKeepAlive(Integer.parseInt(keepAliveAttr.getAttributeValue()));
+            }
+        }
+                
+        return executor;
+    }
+
+    private static List<InternalQueue<Runnable>> createQueues(
+            QName qQName, OMElement queuesEle, boolean isFixedSize) throws AxisFault {
+
+        List<InternalQueue<Runnable>> internalQueues =
+                new ArrayList<InternalQueue<Runnable>>();
+
+        Iterator it = queuesEle.getChildrenWithName(qQName);
+        while (it.hasNext()) {
+            OMElement qElement = (OMElement) it.next();
+            String size = qElement.getAttributeValue(SIZE_ATT);
+            String priority = qElement.getAttributeValue(PRIORITY_ATT);
+            int s = 0;
+            int p = 0;
+            if (priority != null) {
+                p = Integer.parseInt(priority);
+            } else {
+                handlerException("Priority must be specified");
+            }
+
+            if (size != null) {
+                s = Integer.parseInt(size);
+                isFixedSize = true;
+            } else if (isFixedSize) {
+                handlerException("Queues should have a " + ExecutorConstants.SIZE);
+            }
+
+            InternalQueue<Runnable> queue;
+            if (isFixedSize) {
+                queue = new FixedSizeQueue<Runnable>(p, s);
+            } else {
+                queue = new UnboundedQueue<Runnable>(p);
+            }
+
+            internalQueues.add(queue);
+        }
+
+        return internalQueues;
+    }
+
+    private static BeforeExecuteHandler createExecuteBeforeHandler(
+            String className) throws AxisFault {
+        try {
+            Class c = Class.forName(className);
+            Object o = c.newInstance();
+
+            if (o instanceof BeforeExecuteHandler) {
+                return (BeforeExecuteHandler) o;
+            } else {
+                handlerException("Before execute handler class, " +
+                        className +
+                        " is not type of BeforeExecuteHandler");
+            }
+        } catch (ClassNotFoundException e1) {
+            handlerException("Before execute handler class, " +
+                        className +
+                        " is not found");
+        } catch (IllegalAccessException e1) {
+            handlerException("Before execute handler class, " +
+                        className +
+                        " cannot be accessed");
+        } catch (InstantiationException e1) {
+            handlerException("Before execute handler class, " +
+                        className +
+                        " cannot be instantiated");
+        }
+        return null;
+    }
+
+    private static NextQueueAlgorithm<Runnable> createNextQueueAlgo(
+            String className) throws AxisFault {
+        try {
+            Class c = Class.forName(className);
+            Object o = c.newInstance();
+
+            if (o instanceof NextQueueAlgorithm) {
+                return (NextQueueAlgorithm<Runnable>) o;
+            } else {
+                handlerException("NextQueue algorithm class, " +
+                        className +
+                        " is not type of BeforeExecuteHandler");
+            }
+        } catch (ClassNotFoundException e1) {
+            handlerException("NextQueue algorithm class, " +
+                    className + " is not found");
+        } catch (IllegalAccessException e1) {
+            handlerException("NextQueue algorithm class, " +
+                    className + " cannot be accessed");
+        } catch (InstantiationException e1) {
+            handlerException("NextQueue algorithm class, " +
+                    className + " cannot be instantiated");
+        }
+        return null;
+    }
+
+    private static QName createQname(String namespace, String name) {
+        if (namespace == null) {
+            return new QName(name);
+        }
+        return new QName(namespace, name);
+    }
+
+    private static void handlerException(String message) throws AxisFault {
+        log.error(message);
+        throw new AxisFault(message);
+    }      
+}

Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,112 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.commons.executors.config;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.synapse.commons.executors.*;
+
+import javax.xml.namespace.QName;
+import java.util.List;
+
+public class PriorityExecutorSerializer {
+
+    public static OMElement serialize(OMElement parent,
+                                      PriorityExecutor executor, String namespace) {        
+        QName executorQName = createQname(namespace, ExecutorConstants.PRIORITY_EXECUTOR);
+        QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
+        QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
+        QName threadsQName = createQname(namespace, ExecutorConstants.THREADS);
+
+
+        OMFactory fac = OMAbstractFactory.getOMFactory();
+        OMElement executorElement = fac.createOMElement(executorQName);
+        OMNamespace nullNS = fac.createOMNamespace("", "");
+
+        if (executor.getName() != null) {
+            executorElement.addAttribute(fac.createOMAttribute(ExecutorConstants.NAME,
+                    nullNS, executor.getName()));
+        }
+
+        if (executor.getBeforeExecuteHandler() != null) {
+            executorElement.addAttribute(fac.createOMAttribute(
+                    ExecutorConstants.BEFORE_EXECUTE_HANDLER, nullNS,
+                    executor.getBeforeExecuteHandler().getClass().getName()));
+        }
+
+        // create the queues configuration
+        MultiPriorityBlockingQueue queue = executor.getQueue();
+        NextQueueAlgorithm algo = queue.getNextQueue();
+        OMElement queuesEle = fac.createOMElement(queuesQName);
+
+        if (!(algo instanceof PRRNextQueueAlgorithm)) {
+            queuesEle.addAttribute(fac.createOMAttribute(ExecutorConstants.NEXT_QUEUE, nullNS,
+                    algo.getClass().getName()));            
+        }
+
+        if (!queue.isFixedSizeQueues()) {
+            queuesEle.addAttribute(fac.createOMAttribute(ExecutorConstants.IS_FIXED_SIZE,
+                    nullNS, Boolean.toString(false)));
+        }
+
+        List<InternalQueue> intQueues = queue.getQueues();
+        for (InternalQueue intQueue : intQueues) {
+            OMElement queueEle = fac.createOMElement(queueQName);
+
+            if (queue.isFixedSizeQueues()) {
+                queueEle.addAttribute(fac.createOMAttribute(ExecutorConstants.SIZE, nullNS,
+                        Integer.toString(intQueue.getCapacity())));
+            }
+
+            queueEle.addAttribute(fac.createOMAttribute(ExecutorConstants.PRIORITY, nullNS,
+                    Integer.toString(intQueue.getPriority())));
+
+            queuesEle.addChild(queueEle);
+        }
+        executorElement.addChild(queuesEle);
+
+        // create the Threads configuration
+        OMElement threadsEle = fac.createOMElement(threadsQName);
+        threadsEle.addAttribute(fac.createOMAttribute(
+                ExecutorConstants.MAX, nullNS, Integer.toString(executor.getMax())));
+        threadsEle.addAttribute(fac.createOMAttribute(
+                ExecutorConstants.CORE, nullNS, Integer.toString(executor.getCore())));
+        threadsEle.addAttribute(fac.createOMAttribute(
+                ExecutorConstants.KEEP_ALIVE, nullNS, Integer.toString(executor.getKeepAlive())));
+
+        executorElement.addChild(threadsEle);
+
+        if (parent != null) {
+            parent.addChild(executorElement);
+        }
+
+        return executorElement;
+    }
+
+    private static QName createQname(String namespace, String name) {
+        if (namespace == null) {
+            return new QName(name);
+        }
+        return new QName(namespace, name, "syn");
+    }
+}
+

Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,188 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.commons.executors.queues;
+
+import org.apache.synapse.commons.executors.InternalQueue;
+
+import java.util.concurrent.locks.Condition;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+
+/**
+ * A bounded queue implementation for internal queues. This queue is backed by an
+ * fixed size array.
+ *
+ * @param <E> Should implement the Importance interface
+ */
+public class FixedSizeQueue<E> extends AbstractQueue<E> implements InternalQueue<E> {
+
+    /**
+     * Priority of this queue
+     */
+    private int priority;    
+
+    /**
+     * A waiting queue when this queue is full
+     */
+    private Condition notFullCond;
+
+    /**
+     * Array holding the queues
+     */
+    private E[] array;
+
+    /**
+     * Capacity of the queue
+     */
+    private int capacity;
+
+    /**
+     * Number of elements in the queue
+     */
+    private int count = 0;
+
+    /**
+     * Head of the queue
+     */
+    private int head;
+
+    /**
+     * Tail of the queue
+     */
+    private int tail;
+    
+
+    public FixedSizeQueue(int priority, int capacity) {
+        this.priority = priority;        
+        this.capacity = capacity;
+
+        array = (E[]) new Object[capacity];
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int p) {
+        this.priority = p;
+    }    
+
+    public Condition getNotFullCond() {
+        return notFullCond;
+    }
+
+    public void setNotFullCond(Condition notFullCond) {
+        this.notFullCond = notFullCond;
+    }
+
+    public Iterator<E> iterator() {return null;}
+
+    public int size() {
+        return count;
+    }
+
+    public String toString() {
+        return super.toString() + this.priority;
+    }
+
+    public boolean offer(E e) {
+        if (count == array.length) {
+            return false;
+        } else {
+            insert(e);
+            return true;
+        }
+    }
+
+    public E poll() {
+        if (count == 0)
+            return null;
+        return get();
+    }
+
+    public E peek() {
+        return (count == 0) ? null : array[head];
+    }
+
+    public int remainingCapacity() {
+        return capacity - count;        
+    }
+
+    public int drainTo(Collection<? super E> c) {
+        final E[] items = this.array;
+        int i = head;
+        int n = 0;
+        int max = count;
+        while (n < max) {
+            c.add(items[i]);
+            items[i] = null;
+            i = increment(i);
+            n++;
+        }
+        if (n > 0) {
+            count = 0;
+            tail = 0;
+            head = 0;
+
+        }
+        return n;
+    }
+
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        final E[] items = this.array;
+        int i = head;
+        int n = 0;
+        int max = (maxElements < count) ? maxElements : count;
+        while (n < max) {
+            c.add(items[i]);
+            items[i] = null;
+            i = increment(i);
+            n++;
+        }
+        if (n > 0) {
+            count -= n;
+            head = i;
+        }
+        return n;
+    }
+
+    public int getCapacity() {
+        return capacity;
+    }
+
+    private int increment(int i) {
+        return (++i == array.length)? 0 : i;
+    }
+
+    private void insert(E e) {
+        array[tail] = e;
+        tail = increment(tail);
+        count++;
+    }
+
+    private E get() {
+        E e = array[head];
+        array[head] = null;
+        head = increment(head);
+        count--;
+        return e;
+    }
+}

Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.commons.executors.queues;
+
+import org.apache.synapse.commons.executors.InternalQueue;
+
+import java.util.*;
+import java.util.concurrent.locks.Condition;
+
+/**
+ * An unbounded queue
+ * @param <E>
+ */
+public class UnboundedQueue<E> extends AbstractQueue<E> implements InternalQueue<E> {
+
+    private List<E> elements = new ArrayList<E>();
+
+     /**
+     * Priority of this queue
+     */
+    private int priority;
+    
+    /**
+     * A waiting queue when this queue is full
+     */
+    private Condition notFullCond;
+
+    public UnboundedQueue(int priority) {
+        this.priority = priority;        
+    }
+
+    public Iterator<E> iterator() {
+        return elements.iterator();
+    }
+
+    public int size() {
+        return elements.size();
+    }
+
+    public boolean offer(E e) {
+        return elements.add(e);
+    }
+
+    public E poll() {
+        return elements.remove(elements.size() - 1);
+    }
+
+    public E peek() {
+        return elements.get(elements.size() - 1);
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public void setPriority(int p) {
+        this.priority = p;
+    }    
+
+    public Condition getNotFullCond() {
+        return notFullCond;
+    }
+
+    public void setNotFullCond(Condition condition) {
+        this.notFullCond = condition;
+    }
+
+    public int drainTo(Collection<? super E> c) {
+        int count = elements.size();
+        c.addAll(elements);
+        elements.clear();
+        return count;
+    }
+
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (maxElements >= elements.size()) {
+            return drainTo(c);
+        } else {
+            elements.subList(elements.size() - maxElements - 1, elements.size());
+            return maxElements;
+        }
+    }
+
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    public int getCapacity() {
+        return Integer.MAX_VALUE;
+    }
+}

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java Fri Feb 12 08:57:58 2010
@@ -26,6 +26,7 @@
 import org.apache.synapse.*;
 import org.apache.synapse.eventing.SynapseEventSource;
 import org.apache.synapse.commons.datasource.DataSourceHelper;
+import org.apache.synapse.commons.executors.PriorityExecutor;
 import org.apache.synapse.config.xml.MediatorFactoryFinder;
 import org.apache.synapse.config.xml.endpoints.XMLToEndpointMapper;
 import org.apache.synapse.core.SynapseEnvironment;
@@ -116,6 +117,11 @@
     private List<SynapseObserver> observers = new ArrayList<SynapseObserver>();
 
     /**
+     * Executors for executing sequences with priorities
+     */
+    private Map<String, PriorityExecutor> executors = new HashMap<String, PriorityExecutor>();
+
+    /**
      * Add a named sequence into the local registry. If a sequence already exists by the specified
      * key a runtime exception is thrown.
      *
@@ -1022,6 +1028,11 @@
                 stp.init(se);
             }
         }
+
+        // initialize sequence executors
+        for (PriorityExecutor executor : getPriorityExecutors().values()) {
+            executor.init();
+        }
     }
 
     private void handleException(String msg) {
@@ -1089,6 +1100,33 @@
         return Collections.unmodifiableList(observers);
     }
 
+    /**
+     * Add an executor
+     * @param name name of the executor
+     * @param executor executor
+     */
+    public void addPriorityExecutor(String name, PriorityExecutor executor) {
+        executors.put(name, executor);
+    }
+
+    /**
+     * Get the executors map
+     * @return exectors map, stored as name of executor and executor
+     */
+    public Map<String, PriorityExecutor> getPriorityExecutors() {
+        return executors;
+    }
+
+    /**
+     * Removes an executor from the configuration
+     * 
+     * @param name name of the executor
+     * @return removed executor
+     */
+    public PriorityExecutor removeExecutor(String name) {
+        return executors.remove(name);        
+    }
+
     private void assertAlreadyExists(String key, String type) {
 
         if (key == null || "".equals(key)) {

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,65 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.mediators.builtin.EnqueueMediator;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+
+import javax.xml.namespace.QName;
+
+
+public class EnqueueMediatorFactory extends AbstractMediatorFactory{
+    public static final QName ENQUEUE_Q = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "enqueue");
+    public static final QName SEQUENCE_ATT = new QName("sequence");
+    public static final QName PRIORITY_ATT = new QName("priority");
+    public static final QName QUEUE_ATT = new QName("executor");
+
+    public Mediator createMediator(OMElement elem) {
+        EnqueueMediator mediator = new EnqueueMediator();
+
+        OMAttribute seqAtt = elem.getAttribute(SEQUENCE_ATT);
+        if (seqAtt != null && !"".equals(seqAtt.getAttributeValue())) {
+            mediator.setSequenceName(seqAtt.getAttributeValue());
+        } else {
+            handleException("sequence is a required attribue");
+        }
+
+        OMAttribute priorityAtt = elem.getAttribute(PRIORITY_ATT);
+        if (priorityAtt != null && !"".equals(priorityAtt.getAttributeValue())) {
+            mediator.setPriority(Integer.parseInt(priorityAtt.getAttributeValue()));
+        }
+
+        OMAttribute queueAtt = elem.getAttribute(QUEUE_ATT);
+        if (queueAtt != null && !"".equals(queueAtt.getAttributeValue())) {
+            mediator.setExecutorName(queueAtt.getAttributeValue());
+        } else {
+            handleException("Queue is a required attribue");
+        }
+
+        return mediator;
+    }
+
+    public QName getTagQName() {
+        return ENQUEUE_Q;
+    }
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,65 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.builtin.EnqueueMediator;
+
+
+public class EnqueueMediatorSerializer extends AbstractMediatorSerializer{
+    public OMElement serializeMediator(OMElement parent, Mediator m) {
+        assert m instanceof EnqueueMediator :
+                "Unsupported mediator passed in for serialization : " + m.getType();
+
+        EnqueueMediator mediator = (EnqueueMediator) m;
+        OMElement enqueue = fac.createOMElement("enqueue", synNS);
+        saveTracingState(enqueue, mediator);
+
+        if (mediator.getExecutorName() != null) {
+            enqueue.addAttribute(fac.createOMAttribute(
+                    "executor", nullNS, mediator.getExecutorName()));
+        } else {
+            handleException("Invalid enqueue mediator. queue is required");
+        }
+
+        if (mediator.getSequenceName() != null) {
+            enqueue.addAttribute(fac.createOMAttribute(
+                    "sequence", nullNS, mediator.getSequenceName()));
+        }  else {
+            handleException("Invalid enqueue mediator. sequence is required");
+        }
+
+        if (mediator.getPriority() != 1) {
+            enqueue.addAttribute(fac.createOMAttribute(
+                    "priority", nullNS, mediator.getPriority() + ""));
+        }
+
+        if (parent != null) {
+            parent.addChild(enqueue);
+        }
+
+        return enqueue;
+    }
+
+    public String getMediatorClassName() {
+        return EnqueueMediator.class.getName();
+    }
+}

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java Fri Feb 12 08:57:58 2010
@@ -73,7 +73,8 @@
         CacheMediatorFactory.class,
         CalloutMediatorFactory.class,
         EventPublisherMediatorFactory.class,
-        TransactionMediatorFactory.class
+        TransactionMediatorFactory.class,
+        EnqueueMediatorFactory.class
     };
 
     private final static MediatorFactoryFinder instance  = new MediatorFactoryFinder();

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java Fri Feb 12 08:57:58 2010
@@ -60,7 +60,8 @@
         CacheMediatorSerializer.class,
         CalloutMediatorSerializer.class,
         EventPublisherMediatorSerializer.class,
-        TransactionMediatorSerializer.class
+        TransactionMediatorSerializer.class,
+        EnqueueMediatorSerializer.class
     };
 
     private final static MediatorSerializerFinder instance = new MediatorSerializerFinder();

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java Fri Feb 12 08:57:58 2010
@@ -25,6 +25,7 @@
 import org.apache.synapse.Mediator;
 import org.apache.synapse.Startup;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
 import org.apache.synapse.config.Entry;
 import org.apache.synapse.config.SynapseConfigUtils;
 import org.apache.synapse.config.SynapseConfiguration;
@@ -72,6 +73,7 @@
     public static final String LOCAL_ENTRY_DIR     = "local-entries";
     public static final String TASKS_DIR           = "tasks";
     public static final String EVENTS_DIR          = "event-sources";
+    public static final String EXECUTORS_DIR       = "priority-executors";
 
     public static final String REGISTRY_FILE       = "registry.xml";
 
@@ -116,6 +118,7 @@
         createProxyServices(synapseConfig, root);
         createTasks(synapseConfig, root);
         createEventSources(synapseConfig, root);
+        createExecutors(synapseConfig, root);
 
         return synapseConfig;
     }
@@ -276,6 +279,26 @@
         }
     }
 
+    private static void createExecutors(SynapseConfiguration synapseConfig, String rootDirPath)
+            throws XMLStreamException {
+
+        File eventsDir = new File(rootDirPath, EXECUTORS_DIR);
+        if (eventsDir.exists()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Loading event sources from : " + eventsDir.getPath());
+            }
+            File[] events = eventsDir.listFiles(filter);
+            for (File file : events) {
+                try {
+                    OMElement document = parseFile(file);
+                    PriorityExecutor executor = SynapseXMLConfigurationFactory.
+                            defineExecutor(synapseConfig, document);
+                    executor.setFileName(file.getName());
+                } catch (FileNotFoundException ignored) {}
+           }
+        }
+    }
+
     private static OMElement parseFile(File file)
             throws FileNotFoundException, XMLStreamException {
         InputStream is = new FileInputStream(file);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java Fri Feb 12 08:57:58 2010
@@ -28,6 +28,8 @@
 import org.apache.synapse.eventing.SynapseEventSource;
 import org.apache.synapse.Startup;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.config.PriorityExecutorSerializer;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.endpoints.AbstractEndpoint;
 import org.apache.synapse.mediators.base.SequenceMediator;
@@ -86,6 +88,7 @@
             serializeEventSources(synapseConfig.getEventSources(), definitions);
             serializeTasks(synapseConfig.getStartups(), definitions);
             serializeLocalRegistryValues(synapseConfig.getLocalRegistry().values(), definitions);
+            serializeExecutors(synapseConfig.getPriorityExecutors().values(), definitions);
 
             // Now serialize the content to synapse.xml
             serializeSynapseXML(definitions);
@@ -145,6 +148,12 @@
             throw new Exception("Error while creating the directory for tasks : " +
                     tasksDir.getAbsolutePath());
         }
+
+        File executorDir = new File(rootDirectory, MultiXMLConfigurationBuilder.EXECUTORS_DIR);
+        if (!executorDir.exists() && !executorDir.mkdirs()) {
+            throw new Exception("Error while creating the directory for tasks : " +
+                    executorDir.getAbsolutePath());
+        }
     }
 
     /**
@@ -170,6 +179,7 @@
         Collection<SynapseEventSource> eventSources = synapseConfig.getEventSources();
         Collection<Startup> tasks = synapseConfig.getStartups();
         Collection localEntries = synapseConfig.getLocalRegistry().values();
+        Collection<PriorityExecutor> executors = synapseConfig.getPriorityExecutors().values();
 
         for (ProxyService service : proxyServices) {
             if (service.getFileName() == null) {
@@ -217,6 +227,11 @@
             }
         }
 
+        for (PriorityExecutor executor : executors) {
+            PriorityExecutorSerializer.serialize(definitions, executor,
+                    SynapseConstants.SYNAPSE_NAMESPACE);
+        }
+
         serializeSynapseXML(definitions);
     }
 
@@ -488,4 +503,30 @@
         }
     }
 
+    private void serializeExecutors(Collection<PriorityExecutor> executors,
+                                       OMElement parent) throws Exception {
+        for (PriorityExecutor source : executors) {
+            serializeExecutor(source, parent);
+        }
+    }
+
+    private OMElement serializeExecutor(PriorityExecutor source, OMElement parent) throws Exception {
+        File executorDir = new File(rootDirectory, MultiXMLConfigurationBuilder.EXECUTORS_DIR);
+        if (!executorDir.exists() && !executorDir.mkdirs()) {
+            throw new Exception("Error while creating the directory for executors : " +
+                    executorDir.getAbsolutePath());
+        }
+
+        OMElement eventDirElem = PriorityExecutorSerializer.serialize(null, source,
+                SynapseConstants.SYNAPSE_NAMESPACE);
+
+        if (source.getFileName() != null) {
+            File eventSrcFile = new File(executorDir, source.getFileName());
+            writeToFile(eventDirElem, eventSrcFile);
+        } else if (parent != null) {
+            parent.addChild(eventDirElem);
+        }
+
+        return eventDirElem;
+    }
 }
\ No newline at end of file

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java Fri Feb 12 08:57:58 2010
@@ -26,6 +26,8 @@
 import org.apache.synapse.Startup;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
 import org.apache.synapse.aspects.AspectConfiguration;
 import org.apache.synapse.config.Entry;
 import org.apache.synapse.config.SynapseConfigUtils;
@@ -37,6 +39,7 @@
 import org.apache.synapse.eventing.SynapseEventSource;
 import org.apache.synapse.mediators.base.SequenceMediator;
 import org.apache.synapse.registry.Registry;
+import org.apache.axis2.AxisFault;
 
 import javax.xml.namespace.QName;
 import java.util.Iterator;
@@ -86,6 +89,8 @@
                     defineRegistry(config, elt);
                 } else if (XMLConfigConstants.EVENT_SOURCE_ELT.equals(elt.getQName())) {
                     defineEventSource(config, elt);
+                } else if (XMLConfigConstants.EXECUTOR_ELT.equals(elt.getQName())) {
+                    defineExecutor(config,  elt);
                 } else if (StartupFinder.getInstance().isStartup(elt.getQName())) {
                     defineStartup(config, elt);
                 } else {
@@ -179,6 +184,19 @@
         return eventSource;
     }
 
+    public static PriorityExecutor defineExecutor(SynapseConfiguration config,
+                                                       OMElement elem) {
+        PriorityExecutor executor = null;
+        try {
+            executor = PriorityExecutorFactory.createExecutor(
+                XMLConfigConstants.SYNAPSE_NAMESPACE, elem, true);
+        } catch (AxisFault axisFault) {
+            handleException("Failed to create the priority-executor configuration");
+        }
+        config.addPriorityExecutor(executor.getName(), executor);
+        return executor;
+    }
+
     private static void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java Fri Feb 12 08:57:58 2010
@@ -20,10 +20,12 @@
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.endpoints.EndpointSerializer;
 import org.apache.synapse.config.xml.eventing.EventSourceSerializer;
+import org.apache.synapse.commons.executors.config.PriorityExecutorSerializer;
 import org.apache.synapse.core.axis2.ProxyService;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.Startup;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
 import org.apache.synapse.eventing.SynapseEventSource;
 
 public class SynapseXMLConfigurationSerializer implements ConfigurationSerializer {
@@ -100,6 +102,8 @@
         // handle startups
         serializeStartups(definitions, synCfg.getStartups());
 
+        // Executors
+        serializeExecutors(definitions, synCfg.getPriorityExecutors());
         return definitions;
     }
 
@@ -139,6 +143,18 @@
         }
     }
 
+    private static void serializeExecutors(OMElement definitions,
+                                           Map<String, PriorityExecutor> executors) {
+        for (Object o : executors.keySet()) {
+            if (o instanceof String) {
+                String key = (String) o;
+                PriorityExecutor executor = executors.get(key);
+                PriorityExecutorSerializer.serialize(definitions, executor,
+                        XMLConfigConstants.SYNAPSE_NAMESPACE);
+            }
+        }
+    }
+
     private static void handleException(String msg) {
         log.error(msg);
         throw new SynapseException(msg);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java Fri Feb 12 08:57:58 2010
@@ -65,6 +65,7 @@
     public static final String NULL_NAMESPACE = "";
     public static final Object QUARTZ_QNAME   =
         new QName("http://www.opensymphony.com/quartz/JobSchedulingData", "quartz");
+    public static final QName EXECUTOR_ELT = new QName(SYNAPSE_NAMESPACE, "priority-executor");
 
 	/** The Trace attribute name, for proxy services, sequences */
 	public static final String TRACE_ATTRIB_NAME = "trace";

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,52 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.executors;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SequenceWorker implements Runnable {
+
+    private static final Log log = LogFactory.getLog(SequenceWorker.class);
+
+    private MessageContext messageContext = null;
+
+    private Mediator sequence = null;
+
+    public SequenceWorker(MessageContext messageContext,
+                          Mediator sequence) {
+        this.messageContext = messageContext;
+        this.sequence = sequence;
+    }
+
+    public void run() {
+        try {
+            sequence.mediate(messageContext);
+        } catch (Throwable e) {
+            String msg = "Exception occurred while trying to execute the sequence ";
+            if (sequence instanceof SequenceMediator) {
+                log.error(msg + ((SequenceMediator)sequence).getName());
+            }
+        }
+    }    
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,108 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.mediators.builtin;
+
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseLog;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.executors.SequenceWorker;
+
+/**
+ * This mediator execute a given sequence with a given priority.
+ *
+ * It accepts the priority as and argument. The executor used for executing this
+ * sequence should support this priority. If it doesn't support this priority it
+ * executor can throw exceptions.
+ */
+public class EnqueueMediator extends AbstractMediator {
+    private String executorName = null;
+
+    private int priority = 1;
+
+    private String sequenceName = null;
+
+    public boolean mediate(MessageContext synCtx) {
+        SynapseLog log = getLog(synCtx);
+        if (log.isTraceOrDebugEnabled()) {
+            log.traceOrDebug("Start: enqueue mediator");
+        }
+
+        assert executorName != null : "executor name shouldn't be null";
+
+        PriorityExecutor executor = synCtx.getConfiguration().
+                getPriorityExecutors().get(executorName);
+        if (executor == null) {
+            handleException("executor cannot be found for the name : " + executorName, synCtx);
+            return false;
+        }
+
+
+        Mediator m = synCtx.getSequence(sequenceName);
+        if (m != null && m instanceof SequenceMediator) {
+
+            SequenceWorker worker = new SequenceWorker(synCtx, m);
+
+            executor.execute(worker, priority);
+
+            if (log.isTraceOrDebugEnabled()) {
+                log.traceOrDebug("End: enqueue mediator");
+            }
+
+            // with the nio transport, this causes the listener not to write a 202
+            // Accepted response, as this implies that Synapse does not yet know if
+            // a 202 or 200 response would be written back.
+            ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext().setProperty(
+                    org.apache.axis2.Constants.RESPONSE_WRITTEN, "SKIP");
+
+            return true;
+        } else {
+            handleException("Sequence cannot be found : " + sequenceName, synCtx);
+            return false;
+        }               
+    }
+
+    public String getExecutorName() {
+        return executorName;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public String getSequenceName() {
+        return sequenceName;
+    }
+
+    public void setExecutorName(String executorName) {
+        this.executorName = executorName;
+    }
+
+    public void setPriority(int priority) {
+        this.priority = priority;
+    }
+
+    public void setSequenceName(String sequenceName) {
+        this.sequenceName = sequenceName;
+    }
+}

Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java Fri Feb 12 08:57:58 2010
@@ -19,6 +19,8 @@
 package org.apache.synapse.transport.nhttp;
 
 import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -45,10 +47,20 @@
 import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
 import org.apache.http.params.HttpProtocolParams;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.ExecutorConstants;
+import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
+import org.apache.synapse.commons.evaluators.Parser;
+import org.apache.synapse.commons.evaluators.EvaluatorException;
+import org.apache.synapse.commons.evaluators.EvaluatorConstants;
 
 import javax.net.ssl.SSLContext;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.namespace.QName;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
@@ -98,6 +110,10 @@
     private int state = BaseConstants.STOPPED;
     /** The ServerHandler */
     private ServerHandler handler = null;
+    /** This will execute the requests based on calculate priority */
+    private PriorityExecutor executor = null;
+    /** parser for calculating the priority of incoming messages */
+    private Parser parser = null;
 
     protected IOEventDispatch getEventDispatch(
         NHttpServiceHandler handler, SSLContext sslContext, 
@@ -182,6 +198,63 @@
         mbeanSupport
             = new TransportMBeanSupport(this, "nio-http" + (sslContext == null ? "" : "s"));
         mbeanSupport.register();
+
+        // create the priority based executor and parser
+        param = transprtIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
+        if (param != null && param.getValue() != null) {
+            createPriorityConfiguration(param.getValue().toString());
+        }
+    }
+
+    private void createPriorityConfiguration(String fileName) throws AxisFault {
+        OMElement definitions = null;
+        try {
+            FileInputStream fis = new FileInputStream(fileName);
+            definitions = new StAXOMBuilder(fis).getDocumentElement();
+            definitions.build();
+        } catch (FileNotFoundException e) {
+            handleException("Priority configuration file cannot be found : " + fileName, e);
+        } catch (XMLStreamException e) {
+            handleException("Error parsing priority configuration xml file " + fileName, e);
+        }
+
+        assert definitions != null;
+
+        OMElement executorElem = definitions.getFirstChildWithName(
+                new QName(ExecutorConstants.PRIORITY_EXECUTOR));
+
+        if (executorElem == null) {
+            handleException(ExecutorConstants.PRIORITY_EXECUTOR +
+                    " configuration is mandatory for priority based routing");
+        }
+
+        executor = PriorityExecutorFactory.createExecutor(null, executorElem, false);
+        OMElement conditionsElem = definitions.getFirstChildWithName(
+                new QName(EvaluatorConstants.CONDITIONS));
+        if (conditionsElem == null) {
+            handleException("Conditions configuration is mandatory for priority based routing");
+        }
+
+        executor.init();
+
+        assert conditionsElem != null;
+        OMAttribute defPriorityAttr = conditionsElem.getAttribute(
+                new QName(EvaluatorConstants.DEFAULT_PRIORITY));
+        if (defPriorityAttr != null) {
+            parser = new Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
+        } else {
+            parser = new Parser();
+        }
+
+        try {
+            parser.init(conditionsElem);
+        } catch (EvaluatorException e) {
+            handleException("Invalid " + EvaluatorConstants.CONDITIONS +
+                    " configuration for priority based mediation", e);
+        }
+
+        log.info("Created a priority based executor from the configuration: " +
+                fileName);
     }
 
     /**
@@ -272,7 +345,7 @@
             addToServiceURIMap((AxisService) obj);
         }
         
-        handler = new ServerHandler(cfgCtx, params, sslContext != null, metrics);
+        handler = new ServerHandler(cfgCtx, params, sslContext != null, metrics, parser, executor);
         final IOEventDispatch ioEventDispatch = getEventDispatch(handler,
                 sslContext, sslIOSessionHandler, params);
         state = BaseConstants.STARTED;
@@ -549,6 +622,11 @@
         throw new AxisFault(msg, e);
     }
 
+    private void handleException(String msg) throws AxisFault {
+        log.error(msg);
+        throw new AxisFault(msg);
+    }
+
     // -- jmx/management methods--
     public long getMessagesReceived() {
         if (metrics != null) {

Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java Fri Feb 12 08:57:58 2010
@@ -85,4 +85,5 @@
     public static final String HTTP_REQ_METHOD = "HTTP_REQ_METHOD";
     public static final String NO_ENTITY_BODY = "NO_ENTITY_BODY";
     public static final String ENDPOINT_PREFIX = "ENDPOINT_PREFIX";
+    protected static final String PRIORITY_CONFIG_FILE_NAME = "priorityConfigFile";
 }

Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java Fri Feb 12 08:57:58 2010
@@ -44,10 +44,15 @@
 import org.apache.http.util.EncodingUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.commons.evaluators.Parser;
+import org.apache.synapse.commons.evaluators.EvaluatorContext;
+import org.apache.synapse.commons.executors.PriorityExecutor;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
+import java.util.HashMap;
 
 /**
  * The server connection handler. An instance of this class is used by each IOReactor, to
@@ -81,11 +86,16 @@
     /** the metrics collector */
     private MetricsCollector metrics = null;
 
+    private Parser parser = null;
+
+    private PriorityExecutor executor = null;
+
     public static final String REQUEST_SINK_BUFFER = "synapse.request-sink-buffer";
     public static final String RESPONSE_SOURCE_BUFFER = "synapse.response-source-buffer";
 
     public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams params,
-        final boolean isHttps, final MetricsCollector metrics) {
+        final boolean isHttps, final MetricsCollector metrics,
+        Parser parser, PriorityExecutor executor) {
         super();
         this.cfgCtx = cfgCtx;
         this.params = params;
@@ -97,12 +107,17 @@
         this.allocator = new HeapByteBufferAllocator();
 
         this.cfg = NHttpConfiguration.getInstance();
-        this.workerPool = WorkerPoolFactory.getWorkerPool(
-            cfg.getServerCoreThreads(),
-            cfg.getServerMaxThreads(),
-            cfg.getServerKeepalive(),
-            cfg.getServerQueueLen(),
-            "Server Worker thread group", "HttpServerWorker");
+       if (executor == null)  {
+            this.workerPool = WorkerPoolFactory.getWorkerPool(
+                cfg.getServerCoreThreads(),
+                cfg.getServerMaxThreads(),
+                cfg.getServerKeepalive(),
+                cfg.getServerQueueLen(),
+                "Server Worker thread group", "HttpServerWorker");
+        } else {
+            this.executor = executor;
+            this.parser = parser;
+        }
     }
 
     /**
@@ -148,9 +163,22 @@
             response.setEntity(entity);
 
             // hand off processing of the request to a thread off the pool
-            workerPool.execute(
-                new ServerWorker(cfgCtx, conn, isHttps, metrics, this,
-                    request, is, response, os));
+            ServerWorker worker = new ServerWorker(cfgCtx, conn, isHttps, metrics, this,
+                        request, is, response, os);
+
+            if (workerPool != null) {
+                workerPool.execute(worker);
+            } else if (executor != null) {
+                Map<String, String> headers = new HashMap<String, String>();
+                for (Header header : request.getAllHeaders()) {
+                    headers.put(header.getName(), header.getValue());
+                }
+
+                EvaluatorContext evaluatorContext =
+                        new EvaluatorContext(request.getRequestLine().getUri(), headers);
+                int priority = parser.parse(evaluatorContext);
+                executor.execute(worker, priority);
+            }
 
         } catch (Exception e) {
             if (metrics != null) {
@@ -442,7 +470,11 @@
 
     public void stop() {
         try {
-            workerPool.shutdown(1000);
+            if (workerPool != null) {
+                workerPool.shutdown(1000);
+            } else if (executor != null) {
+                executor.destroy();
+            }
         } catch (InterruptedException ignore) {}
     }
 }