You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2010/02/12 09:58:00 UTC
svn commit: r909303 [2/2] - in /synapse/trunk/java/modules: commons/
commons/src/main/java/org/apache/synapse/commons/evaluators/
commons/src/main/java/org/apache/synapse/commons/evaluators/config/
commons/src/main/java/org/apache/synapse/commons/execu...
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.executors;
+
+import java.util.List;
+
+/**
+ * This interface implements the algorith for determining the next internal
+ * queue for picking up the message. This class is created onece and initialized.
+ * This class should capture any runtime information about the queues since the
+ * MultiPriorityBlockingQueue doesn't hold any runtime state information about
+ * the queues.
+ *
+ * @param <E>
+ */
+public interface NextQueueAlgorithm<E> {
+
+ /**
+ * Initialized with the queues sorted according to the priority.
+ *
+ * @param queues list of queues
+ */
+ void init(List<InternalQueue<E>> queues);
+
+ /**
+ * Should return a queue based on some selection criteria and current
+ * state of the queues.
+ *
+ * @return the queue
+ */
+ InternalQueue<E> getNextQueue();
+}
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.executors;
+
+import java.util.List;
+
+/**
+ * This is a priority based round robin algorithm for getting the next queue </p>
+ *
+ * <p>This algorithm works in cycles. Lets say we have queues with following priorities.
+ * 7, 5, 2 and assume we name the queues as 1, 2, 3 in the order. </p>
+ * <p>Here is how messages are picked in a single cycle
+ * 1, 1, 1, 1, 1, 1, 1 // all the messages for the queue with priority 1 are sent for this cycle
+ * 2, 2, 2, 2, 2, // all the messages for the queue with priority 2 are sent for this cycle
+ * 3, 3 // all the messages with priority 2 are sent for this cycle</p>
+ *
+ * <p>This algorithm choose the queues in the above order if all the queues have messages at the
+ * point of selection. If a queue doesn't have messages it will skip the queue and move to the
+ * next. If none of the queues have messages it will return null.
+ */
+public class PRRNextQueueAlgorithm<E> implements NextQueueAlgorithm<E> {
+
+ /**
+ * We hold a reference to the actual queue
+ */
+ private List<InternalQueue<E>> queues;
+
+ /**
+ * Number of queues, this is just to avoid the overhead of calculating
+ * this again and again
+ */
+ private int size = 0;
+
+ /**
+ * Current queue we are operating on
+ */
+ private int currentQueue = 0;
+
+ /**
+ * Number of messages sent from the current queue
+ */
+ private int currentCount = 0;
+
+ public InternalQueue<E> getNextQueue() {
+ InternalQueue<E> internalQueue = queues.get(currentQueue);
+
+ int priority = internalQueue.getPriority();
+
+ if (priority == currentCount || internalQueue.size() == 0) {
+ currentCount = 0;
+ // we need to move to the next queue not empty
+ int c = 0;
+ do {
+ if (currentQueue == queues.size() - 1) {
+ currentQueue = 0;
+ } else {
+ currentQueue++;
+ }
+
+ internalQueue = queues.get(currentQueue);
+
+ c++;
+ // we move forward until we find a non empty queue or everything is empty
+ } while (internalQueue.size() == 0 && c < size);
+
+ if (internalQueue.size() == 0) {
+ currentQueue = 0;
+ return null;
+ }
+ }
+
+ currentCount++;
+
+ /*log.info("Get the queue with the priority: " +
+ internalQueue.getPriority());*/
+
+ return internalQueue;
+ }
+
+ public void init(List<InternalQueue<E>> queues) {
+ this.queues = queues;
+ size = queues.size();
+ }
+}
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.axis2.transport.base.threads.NativeThreadFactory;
+
+import java.util.concurrent.*;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * This is the class used for executing the tasks with a given priority. It is backed by a
+ * BlockingQueue and a ThreadPoolExecutor. The BlockingQueue is a custom implementation which
+ * has multiple internal queues for handling separate priorities.
+ */
+public class PriorityExecutor {
+ private final Log log = LogFactory.getLog(PriorityExecutor.class);
+
+ /** Actual thread pool executor */
+ private ThreadPoolExecutor executor;
+
+ /** Name of the executor */
+ private String name = null;
+
+ /** Core threads count */
+ private int core = ExecutorConstants.DEFAULT_CORE;
+ /** Max thread count */
+ private int max = ExecutorConstants.DEFAULT_MAX;
+ /** Keep alive time for spare threads */
+ private int keepAlive = ExecutorConstants.DEFAULT_KEEP_ALIVE;
+ /** This will be executed before the Task is submitted */
+ private BeforeExecuteHandler beforeExecuteHandler;
+ /** Queue used by the executor */
+ private MultiPriorityBlockingQueue<Runnable> queue;
+
+ /** this is used by the file based synapse xml configuration */
+ private String fileName;
+
+ /**
+ * Execute a given task with the priority specified.
+ *
+ * @param task task to be executed
+ * @param priority priority of the tast
+ */
+ public void execute(final Runnable task, int priority) {
+ Worker w = new Worker(task, priority);
+
+ if (beforeExecuteHandler != null) {
+ beforeExecuteHandler.beforeExecute(w);
+ }
+ // we are capturing all the exceptions to prevent threads from dying
+ executor.execute(w);
+ }
+
+
+ /**
+ * Initialize the executor by using the properties. Create the queues
+ * and ThreadPool executor.
+ */
+ public void init() {
+ if (queue == null) {
+ throw new IllegalStateException("Queue should be specified before initializing");
+ }
+
+ executor = new ThreadPoolExecutor(core, max, keepAlive, TimeUnit.SECONDS, queue,
+ new NativeThreadFactory(new ThreadGroup("executor-group"), "priority-worker"));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Started the thread pool executor with threads, " +
+ "core = " + core + " max = " + max +
+ ", keep-alive = " + keepAlive);
+ }
+ }
+
+ /**
+ * Destroy the executor.
+ */
+ public void destroy() {
+ if (log.isDebugEnabled()) {
+ log.debug("Shutting down thread pool executor");
+ }
+
+ executor.shutdown();
+
+ try {
+ executor.awaitTermination(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("Failed to Shut down Executor");
+ }
+ }
+
+ /**
+ * Set the name of the executor
+ *
+ * @param name of the executor
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the name of the executor
+ *
+ * @return name of the executor
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set a handler for execute before putting a worker in to the queues.
+ * User can set some properties to the worker at this point. This
+ * allows users to get more control over the queue selection algorithm.
+ * This is an optional configuration.
+ *
+ * @param beforeExecuteHandler an object implementing the BeforeExecuteHandler
+ */
+ public void setBeforeExecuteHandler(
+ BeforeExecuteHandler beforeExecuteHandler) {
+ this.beforeExecuteHandler = beforeExecuteHandler;
+ }
+
+ /**
+ * Get the handler that is executed before the worker is put in to the queue
+ *
+ * @return an object of BeforeExecuteHandler
+ */
+ public BeforeExecuteHandler getBeforeExecuteHandler() {
+ return beforeExecuteHandler;
+ }
+
+ /**
+ * Set the queue
+ *
+ * @param queue queue used for handling the priorities
+ */
+ public void setQueue(MultiPriorityBlockingQueue<Runnable> queue) {
+ this.queue = queue;
+ }
+
+ /**
+ * Get the queue
+ *
+ * @return queue used for handling multiple priorities
+ */
+ public MultiPriorityBlockingQueue<Runnable> getQueue() {
+ return queue;
+ }
+
+ /**
+ * Get the core number of threads
+ *
+ * @return core number of threads
+ */
+ public int getCore() {
+ return core;
+ }
+
+ /**
+ * Get the max threads
+ *
+ * @return max thread
+ */
+ public int getMax() {
+ return max;
+ }
+
+ /**
+ * Get the keep alive time for threads
+ *
+ * @return keep alive time for threads
+ */
+ public int getKeepAlive() {
+ return keepAlive;
+ }
+
+ /**
+ * Set the core number of threads
+ *
+ * @param core core number of threads
+ */
+ public void setCore(int core) {
+ this.core = core;
+ }
+
+ /**
+ * Set the max number of threads
+ *
+ * @param max max threads
+ */
+ public void setMax(int max) {
+ this.max = max;
+ }
+
+ /**
+ * Set the keep alive time for threads
+ *
+ * @param keepAlive keep alive threads
+ */
+ public void setKeepAlive(int keepAlive) {
+ this.keepAlive = keepAlive;
+ }
+
+ /**
+ * Get the file used to store this executor config
+ *
+ * @return file used for storing the config
+ */
+ public String getFileName() {
+ return fileName;
+ }
+
+ /**
+ * Set the file used to store the config
+ *
+ * @param fileName file name
+ */
+ public void setFileName(String fileName) {
+ this.fileName = fileName;
+ }
+
+ /**
+ * Private class for executing the tasks submited. This class is used for
+ * prevent the threads from dying in case of unhandled exceptions. Also
+ * this class implements the Importance for carrying the priority.
+ */
+ private class Worker implements Runnable, Importance {
+ private Runnable runnable = null;
+
+ private Map<String, Object> properties = new HashMap<String, Object>();
+
+ private int priority = 1;
+
+ private Worker(Runnable runnable, int priority) {
+ this.priority = priority;
+ this.runnable = runnable;
+ }
+
+ public void run() {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.error("Unhandled exception", e);
+ }
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int p) {
+ this.priority = p;
+ }
+
+ public void setProperty(String name, Object value) {
+ properties.put(name, value);
+ }
+
+ public Object getProperty(String name) {
+ return properties.get(name);
+ }
+ }
+}
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.executors.config;
+
+import org.apache.synapse.commons.executors.*;
+import org.apache.synapse.commons.executors.queues.FixedSizeQueue;
+import org.apache.synapse.commons.executors.queues.UnboundedQueue;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axis2.AxisFault;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+import javax.xml.namespace.QName;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class PriorityExecutorFactory {
+ private static Log log = LogFactory.getLog(PriorityExecutorFactory.class);
+
+ public static final QName NAME_ATT = new QName(ExecutorConstants.NAME);
+ public static final QName CLASS_ATT = new QName("class");
+ public static final QName ATT_NAME = new QName(ExecutorConstants.NAME);
+ public static final QName ATT_VALUE = new QName(ExecutorConstants.VALUE);
+ public static final QName SIZE_ATT = new QName(ExecutorConstants.SIZE);
+ public static final QName PRIORITY_ATT = new QName(ExecutorConstants.PRIORITY);
+
+ public static final QName IS_FIXED_ATT = new QName(ExecutorConstants.IS_FIXED_SIZE);
+
+ public static final QName BEFORE_EXECUTE_HANDLER =
+ new QName(ExecutorConstants.BEFORE_EXECUTE_HANDLER);
+
+ public static final QName NEXT_QUEUE_ATT = new QName(ExecutorConstants.NEXT_QUEUE);
+
+ public static final QName MAX_ATT = new QName(ExecutorConstants.MAX);
+ public static final QName CORE_ATT = new QName(ExecutorConstants.CORE);
+ public static final QName KEEP_ALIVE_ATT = new QName(ExecutorConstants.KEEP_ALIVE);
+
+ public static PriorityExecutor createExecutor(String namespace, OMElement e,
+ boolean requireName) throws AxisFault {
+ QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
+ QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
+
+ QName threadsQName = createQname(namespace, ExecutorConstants.THREADS);
+
+ PriorityExecutor executor = new PriorityExecutor();
+
+ OMAttribute nameAtt = e.getAttribute(NAME_ATT);
+ if (nameAtt != null && !"".equals(nameAtt.getAttributeValue())) {
+ executor.setName(nameAtt.getAttributeValue());
+ } else if (requireName){
+ handlerException(ExecutorConstants.NAME +
+ " is required for a " + ExecutorConstants.PRIORITY_EXECUTOR);
+ }
+
+ // set the handler for calling before the message is put in to the queue
+ OMAttribute handlerAtt = e.getAttribute(BEFORE_EXECUTE_HANDLER);
+ if (handlerAtt != null) {
+ BeforeExecuteHandler beh =
+ createExecuteBeforeHandler(handlerAtt.getAttributeValue());
+ executor.setBeforeExecuteHandler(beh);
+ }
+
+ // create the queue configuration
+ OMElement queuesEle = e.getFirstChildWithName(queuesQName);
+ if (queuesEle != null) {
+ OMAttribute nextQueueAtt = queuesEle.getAttribute(NEXT_QUEUE_ATT);
+
+ NextQueueAlgorithm<Runnable> nqa = null;
+ if (nextQueueAtt != null) {
+ nqa = createNextQueueAlgo(nextQueueAtt.getAttributeValue());
+ }
+
+ boolean isFixedSize = true;
+
+ OMAttribute fixedSizeAtt = queuesEle.getAttribute(IS_FIXED_ATT);
+ if (fixedSizeAtt != null) {
+ isFixedSize = Boolean.parseBoolean(fixedSizeAtt.getAttributeValue());
+ }
+
+ // create the queue configuration
+ List<InternalQueue<Runnable>> intQueues = createQueues(queueQName, queuesEle, isFixedSize);
+
+ MultiPriorityBlockingQueue queue =
+ new MultiPriorityBlockingQueue(intQueues, isFixedSize, nqa);
+
+ executor.setQueue(queue);
+ } else {
+ handlerException("Queues configuration is mandatory");
+ }
+
+ OMElement threadsEle = e.getFirstChildWithName(threadsQName);
+ if (threadsEle != null) {
+ OMAttribute maxAttr = threadsEle.getAttribute(MAX_ATT);
+ if (maxAttr != null) {
+ executor.setMax(Integer.parseInt(maxAttr.getAttributeValue()));
+ }
+ OMAttribute coreAttr = threadsEle.getAttribute(CORE_ATT);
+ if (coreAttr != null) {
+ executor.setCore(Integer.parseInt(coreAttr.getAttributeValue()));
+ }
+ OMAttribute keepAliveAttr = threadsEle.getAttribute(KEEP_ALIVE_ATT);
+ if (keepAliveAttr != null) {
+ executor.setKeepAlive(Integer.parseInt(keepAliveAttr.getAttributeValue()));
+ }
+ }
+
+ return executor;
+ }
+
+ private static List<InternalQueue<Runnable>> createQueues(
+ QName qQName, OMElement queuesEle, boolean isFixedSize) throws AxisFault {
+
+ List<InternalQueue<Runnable>> internalQueues =
+ new ArrayList<InternalQueue<Runnable>>();
+
+ Iterator it = queuesEle.getChildrenWithName(qQName);
+ while (it.hasNext()) {
+ OMElement qElement = (OMElement) it.next();
+ String size = qElement.getAttributeValue(SIZE_ATT);
+ String priority = qElement.getAttributeValue(PRIORITY_ATT);
+ int s = 0;
+ int p = 0;
+ if (priority != null) {
+ p = Integer.parseInt(priority);
+ } else {
+ handlerException("Priority must be specified");
+ }
+
+ if (size != null) {
+ s = Integer.parseInt(size);
+ isFixedSize = true;
+ } else if (isFixedSize) {
+ handlerException("Queues should have a " + ExecutorConstants.SIZE);
+ }
+
+ InternalQueue<Runnable> queue;
+ if (isFixedSize) {
+ queue = new FixedSizeQueue<Runnable>(p, s);
+ } else {
+ queue = new UnboundedQueue<Runnable>(p);
+ }
+
+ internalQueues.add(queue);
+ }
+
+ return internalQueues;
+ }
+
+ private static BeforeExecuteHandler createExecuteBeforeHandler(
+ String className) throws AxisFault {
+ try {
+ Class c = Class.forName(className);
+ Object o = c.newInstance();
+
+ if (o instanceof BeforeExecuteHandler) {
+ return (BeforeExecuteHandler) o;
+ } else {
+ handlerException("Before execute handler class, " +
+ className +
+ " is not type of BeforeExecuteHandler");
+ }
+ } catch (ClassNotFoundException e1) {
+ handlerException("Before execute handler class, " +
+ className +
+ " is not found");
+ } catch (IllegalAccessException e1) {
+ handlerException("Before execute handler class, " +
+ className +
+ " cannot be accessed");
+ } catch (InstantiationException e1) {
+ handlerException("Before execute handler class, " +
+ className +
+ " cannot be instantiated");
+ }
+ return null;
+ }
+
+ private static NextQueueAlgorithm<Runnable> createNextQueueAlgo(
+ String className) throws AxisFault {
+ try {
+ Class c = Class.forName(className);
+ Object o = c.newInstance();
+
+ if (o instanceof NextQueueAlgorithm) {
+ return (NextQueueAlgorithm<Runnable>) o;
+ } else {
+ handlerException("NextQueue algorithm class, " +
+ className +
+ " is not type of BeforeExecuteHandler");
+ }
+ } catch (ClassNotFoundException e1) {
+ handlerException("NextQueue algorithm class, " +
+ className + " is not found");
+ } catch (IllegalAccessException e1) {
+ handlerException("NextQueue algorithm class, " +
+ className + " cannot be accessed");
+ } catch (InstantiationException e1) {
+ handlerException("NextQueue algorithm class, " +
+ className + " cannot be instantiated");
+ }
+ return null;
+ }
+
+ private static QName createQname(String namespace, String name) {
+ if (namespace == null) {
+ return new QName(name);
+ }
+ return new QName(namespace, name);
+ }
+
+ private static void handlerException(String message) throws AxisFault {
+ log.error(message);
+ throw new AxisFault(message);
+ }
+}
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.executors.config;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.synapse.commons.executors.*;
+
+import javax.xml.namespace.QName;
+import java.util.List;
+
+public class PriorityExecutorSerializer {
+
+ public static OMElement serialize(OMElement parent,
+ PriorityExecutor executor, String namespace) {
+ QName executorQName = createQname(namespace, ExecutorConstants.PRIORITY_EXECUTOR);
+ QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
+ QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
+ QName threadsQName = createQname(namespace, ExecutorConstants.THREADS);
+
+
+ OMFactory fac = OMAbstractFactory.getOMFactory();
+ OMElement executorElement = fac.createOMElement(executorQName);
+ OMNamespace nullNS = fac.createOMNamespace("", "");
+
+ if (executor.getName() != null) {
+ executorElement.addAttribute(fac.createOMAttribute(ExecutorConstants.NAME,
+ nullNS, executor.getName()));
+ }
+
+ if (executor.getBeforeExecuteHandler() != null) {
+ executorElement.addAttribute(fac.createOMAttribute(
+ ExecutorConstants.BEFORE_EXECUTE_HANDLER, nullNS,
+ executor.getBeforeExecuteHandler().getClass().getName()));
+ }
+
+ // create the queues configuration
+ MultiPriorityBlockingQueue queue = executor.getQueue();
+ NextQueueAlgorithm algo = queue.getNextQueue();
+ OMElement queuesEle = fac.createOMElement(queuesQName);
+
+ if (!(algo instanceof PRRNextQueueAlgorithm)) {
+ queuesEle.addAttribute(fac.createOMAttribute(ExecutorConstants.NEXT_QUEUE, nullNS,
+ algo.getClass().getName()));
+ }
+
+ if (!queue.isFixedSizeQueues()) {
+ queuesEle.addAttribute(fac.createOMAttribute(ExecutorConstants.IS_FIXED_SIZE,
+ nullNS, Boolean.toString(false)));
+ }
+
+ List<InternalQueue> intQueues = queue.getQueues();
+ for (InternalQueue intQueue : intQueues) {
+ OMElement queueEle = fac.createOMElement(queueQName);
+
+ if (queue.isFixedSizeQueues()) {
+ queueEle.addAttribute(fac.createOMAttribute(ExecutorConstants.SIZE, nullNS,
+ Integer.toString(intQueue.getCapacity())));
+ }
+
+ queueEle.addAttribute(fac.createOMAttribute(ExecutorConstants.PRIORITY, nullNS,
+ Integer.toString(intQueue.getPriority())));
+
+ queuesEle.addChild(queueEle);
+ }
+ executorElement.addChild(queuesEle);
+
+ // create the Threads configuration
+ OMElement threadsEle = fac.createOMElement(threadsQName);
+ threadsEle.addAttribute(fac.createOMAttribute(
+ ExecutorConstants.MAX, nullNS, Integer.toString(executor.getMax())));
+ threadsEle.addAttribute(fac.createOMAttribute(
+ ExecutorConstants.CORE, nullNS, Integer.toString(executor.getCore())));
+ threadsEle.addAttribute(fac.createOMAttribute(
+ ExecutorConstants.KEEP_ALIVE, nullNS, Integer.toString(executor.getKeepAlive())));
+
+ executorElement.addChild(threadsEle);
+
+ if (parent != null) {
+ parent.addChild(executorElement);
+ }
+
+ return executorElement;
+ }
+
+ private static QName createQname(String namespace, String name) {
+ if (namespace == null) {
+ return new QName(name);
+ }
+ return new QName(namespace, name, "syn");
+ }
+}
+
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.executors.queues;
+
+import org.apache.synapse.commons.executors.InternalQueue;
+
+import java.util.concurrent.locks.Condition;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Collection;
+
+/**
+ * A bounded queue implementation for internal queues. This queue is backed by an
+ * fixed size array.
+ *
+ * @param <E> Should implement the Importance interface
+ */
+public class FixedSizeQueue<E> extends AbstractQueue<E> implements InternalQueue<E> {
+
+ /**
+ * Priority of this queue
+ */
+ private int priority;
+
+ /**
+ * A waiting queue when this queue is full
+ */
+ private Condition notFullCond;
+
+ /**
+ * Array holding the queues
+ */
+ private E[] array;
+
+ /**
+ * Capacity of the queue
+ */
+ private int capacity;
+
+ /**
+ * Number of elements in the queue
+ */
+ private int count = 0;
+
+ /**
+ * Head of the queue
+ */
+ private int head;
+
+ /**
+ * Tail of the queue
+ */
+ private int tail;
+
+
+ public FixedSizeQueue(int priority, int capacity) {
+ this.priority = priority;
+ this.capacity = capacity;
+
+ array = (E[]) new Object[capacity];
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int p) {
+ this.priority = p;
+ }
+
+ public Condition getNotFullCond() {
+ return notFullCond;
+ }
+
+ public void setNotFullCond(Condition notFullCond) {
+ this.notFullCond = notFullCond;
+ }
+
+ public Iterator<E> iterator() {return null;}
+
+ public int size() {
+ return count;
+ }
+
+ public String toString() {
+ return super.toString() + this.priority;
+ }
+
+ public boolean offer(E e) {
+ if (count == array.length) {
+ return false;
+ } else {
+ insert(e);
+ return true;
+ }
+ }
+
+ public E poll() {
+ if (count == 0)
+ return null;
+ return get();
+ }
+
+ public E peek() {
+ return (count == 0) ? null : array[head];
+ }
+
+ public int remainingCapacity() {
+ return capacity - count;
+ }
+
+ public int drainTo(Collection<? super E> c) {
+ final E[] items = this.array;
+ int i = head;
+ int n = 0;
+ int max = count;
+ while (n < max) {
+ c.add(items[i]);
+ items[i] = null;
+ i = increment(i);
+ n++;
+ }
+ if (n > 0) {
+ count = 0;
+ tail = 0;
+ head = 0;
+
+ }
+ return n;
+ }
+
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ final E[] items = this.array;
+ int i = head;
+ int n = 0;
+ int max = (maxElements < count) ? maxElements : count;
+ while (n < max) {
+ c.add(items[i]);
+ items[i] = null;
+ i = increment(i);
+ n++;
+ }
+ if (n > 0) {
+ count -= n;
+ head = i;
+ }
+ return n;
+ }
+
+ public int getCapacity() {
+ return capacity;
+ }
+
+ private int increment(int i) {
+ return (++i == array.length)? 0 : i;
+ }
+
+ private void insert(E e) {
+ array[tail] = e;
+ tail = increment(tail);
+ count++;
+ }
+
+ private E get() {
+ E e = array[head];
+ array[head] = null;
+ head = increment(head);
+ count--;
+ return e;
+ }
+}
Added: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java (added)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.commons.executors.queues;
+
+import org.apache.synapse.commons.executors.InternalQueue;
+
+import java.util.*;
+import java.util.concurrent.locks.Condition;
+
+/**
+ * An unbounded queue
+ * @param <E>
+ */
+public class UnboundedQueue<E> extends AbstractQueue<E> implements InternalQueue<E> {
+
+ private List<E> elements = new ArrayList<E>();
+
+ /**
+ * Priority of this queue
+ */
+ private int priority;
+
+ /**
+ * A waiting queue when this queue is full
+ */
+ private Condition notFullCond;
+
+ public UnboundedQueue(int priority) {
+ this.priority = priority;
+ }
+
+ public Iterator<E> iterator() {
+ return elements.iterator();
+ }
+
+ public int size() {
+ return elements.size();
+ }
+
+ public boolean offer(E e) {
+ return elements.add(e);
+ }
+
+ public E poll() {
+ return elements.remove(elements.size() - 1);
+ }
+
+ public E peek() {
+ return elements.get(elements.size() - 1);
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int p) {
+ this.priority = p;
+ }
+
+ public Condition getNotFullCond() {
+ return notFullCond;
+ }
+
+ public void setNotFullCond(Condition condition) {
+ this.notFullCond = condition;
+ }
+
+ public int drainTo(Collection<? super E> c) {
+ int count = elements.size();
+ c.addAll(elements);
+ elements.clear();
+ return count;
+ }
+
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (maxElements >= elements.size()) {
+ return drainTo(c);
+ } else {
+ elements.subList(elements.size() - maxElements - 1, elements.size());
+ return maxElements;
+ }
+ }
+
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ public int getCapacity() {
+ return Integer.MAX_VALUE;
+ }
+}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/SynapseConfiguration.java Fri Feb 12 08:57:58 2010
@@ -26,6 +26,7 @@
import org.apache.synapse.*;
import org.apache.synapse.eventing.SynapseEventSource;
import org.apache.synapse.commons.datasource.DataSourceHelper;
+import org.apache.synapse.commons.executors.PriorityExecutor;
import org.apache.synapse.config.xml.MediatorFactoryFinder;
import org.apache.synapse.config.xml.endpoints.XMLToEndpointMapper;
import org.apache.synapse.core.SynapseEnvironment;
@@ -116,6 +117,11 @@
private List<SynapseObserver> observers = new ArrayList<SynapseObserver>();
/**
+ * Executors for executing sequences with priorities
+ */
+ private Map<String, PriorityExecutor> executors = new HashMap<String, PriorityExecutor>();
+
+ /**
* Add a named sequence into the local registry. If a sequence already exists by the specified
* key a runtime exception is thrown.
*
@@ -1022,6 +1028,11 @@
stp.init(se);
}
}
+
+ // initialize sequence executors
+ for (PriorityExecutor executor : getPriorityExecutors().values()) {
+ executor.init();
+ }
}
private void handleException(String msg) {
@@ -1089,6 +1100,33 @@
return Collections.unmodifiableList(observers);
}
+ /**
+ * Add an executor
+ * @param name name of the executor
+ * @param executor executor
+ */
+ public void addPriorityExecutor(String name, PriorityExecutor executor) {
+ executors.put(name, executor);
+ }
+
+ /**
+ * Get the executors map
+ * @return exectors map, stored as name of executor and executor
+ */
+ public Map<String, PriorityExecutor> getPriorityExecutors() {
+ return executors;
+ }
+
+ /**
+ * Removes an executor from the configuration
+ *
+ * @param name name of the executor
+ * @return removed executor
+ */
+ public PriorityExecutor removeExecutor(String name) {
+ return executors.remove(name);
+ }
+
private void assertAlreadyExists(String key, String type) {
if (key == null || "".equals(key)) {
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorFactory.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.mediators.builtin.EnqueueMediator;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+
+import javax.xml.namespace.QName;
+
+
+public class EnqueueMediatorFactory extends AbstractMediatorFactory{
+ public static final QName ENQUEUE_Q = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "enqueue");
+ public static final QName SEQUENCE_ATT = new QName("sequence");
+ public static final QName PRIORITY_ATT = new QName("priority");
+ public static final QName QUEUE_ATT = new QName("executor");
+
+ public Mediator createMediator(OMElement elem) {
+ EnqueueMediator mediator = new EnqueueMediator();
+
+ OMAttribute seqAtt = elem.getAttribute(SEQUENCE_ATT);
+ if (seqAtt != null && !"".equals(seqAtt.getAttributeValue())) {
+ mediator.setSequenceName(seqAtt.getAttributeValue());
+ } else {
+ handleException("sequence is a required attribue");
+ }
+
+ OMAttribute priorityAtt = elem.getAttribute(PRIORITY_ATT);
+ if (priorityAtt != null && !"".equals(priorityAtt.getAttributeValue())) {
+ mediator.setPriority(Integer.parseInt(priorityAtt.getAttributeValue()));
+ }
+
+ OMAttribute queueAtt = elem.getAttribute(QUEUE_ATT);
+ if (queueAtt != null && !"".equals(queueAtt.getAttributeValue())) {
+ mediator.setExecutorName(queueAtt.getAttributeValue());
+ } else {
+ handleException("Queue is a required attribue");
+ }
+
+ return mediator;
+ }
+
+ public QName getTagQName() {
+ return ENQUEUE_Q;
+ }
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/EnqueueMediatorSerializer.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.builtin.EnqueueMediator;
+
+
+public class EnqueueMediatorSerializer extends AbstractMediatorSerializer{
+ public OMElement serializeMediator(OMElement parent, Mediator m) {
+ assert m instanceof EnqueueMediator :
+ "Unsupported mediator passed in for serialization : " + m.getType();
+
+ EnqueueMediator mediator = (EnqueueMediator) m;
+ OMElement enqueue = fac.createOMElement("enqueue", synNS);
+ saveTracingState(enqueue, mediator);
+
+ if (mediator.getExecutorName() != null) {
+ enqueue.addAttribute(fac.createOMAttribute(
+ "executor", nullNS, mediator.getExecutorName()));
+ } else {
+ handleException("Invalid enqueue mediator. queue is required");
+ }
+
+ if (mediator.getSequenceName() != null) {
+ enqueue.addAttribute(fac.createOMAttribute(
+ "sequence", nullNS, mediator.getSequenceName()));
+ } else {
+ handleException("Invalid enqueue mediator. sequence is required");
+ }
+
+ if (mediator.getPriority() != 1) {
+ enqueue.addAttribute(fac.createOMAttribute(
+ "priority", nullNS, mediator.getPriority() + ""));
+ }
+
+ if (parent != null) {
+ parent.addChild(enqueue);
+ }
+
+ return enqueue;
+ }
+
+ public String getMediatorClassName() {
+ return EnqueueMediator.class.getName();
+ }
+}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java Fri Feb 12 08:57:58 2010
@@ -73,7 +73,8 @@
CacheMediatorFactory.class,
CalloutMediatorFactory.class,
EventPublisherMediatorFactory.class,
- TransactionMediatorFactory.class
+ TransactionMediatorFactory.class,
+ EnqueueMediatorFactory.class
};
private final static MediatorFactoryFinder instance = new MediatorFactoryFinder();
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java Fri Feb 12 08:57:58 2010
@@ -60,7 +60,8 @@
CacheMediatorSerializer.class,
CalloutMediatorSerializer.class,
EventPublisherMediatorSerializer.class,
- TransactionMediatorSerializer.class
+ TransactionMediatorSerializer.class,
+ EnqueueMediatorSerializer.class
};
private final static MediatorSerializerFinder instance = new MediatorSerializerFinder();
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationBuilder.java Fri Feb 12 08:57:58 2010
@@ -25,6 +25,7 @@
import org.apache.synapse.Mediator;
import org.apache.synapse.Startup;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
import org.apache.synapse.config.Entry;
import org.apache.synapse.config.SynapseConfigUtils;
import org.apache.synapse.config.SynapseConfiguration;
@@ -72,6 +73,7 @@
public static final String LOCAL_ENTRY_DIR = "local-entries";
public static final String TASKS_DIR = "tasks";
public static final String EVENTS_DIR = "event-sources";
+ public static final String EXECUTORS_DIR = "priority-executors";
public static final String REGISTRY_FILE = "registry.xml";
@@ -116,6 +118,7 @@
createProxyServices(synapseConfig, root);
createTasks(synapseConfig, root);
createEventSources(synapseConfig, root);
+ createExecutors(synapseConfig, root);
return synapseConfig;
}
@@ -276,6 +279,26 @@
}
}
+ private static void createExecutors(SynapseConfiguration synapseConfig, String rootDirPath)
+ throws XMLStreamException {
+
+ File eventsDir = new File(rootDirPath, EXECUTORS_DIR);
+ if (eventsDir.exists()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Loading event sources from : " + eventsDir.getPath());
+ }
+ File[] events = eventsDir.listFiles(filter);
+ for (File file : events) {
+ try {
+ OMElement document = parseFile(file);
+ PriorityExecutor executor = SynapseXMLConfigurationFactory.
+ defineExecutor(synapseConfig, document);
+ executor.setFileName(file.getName());
+ } catch (FileNotFoundException ignored) {}
+ }
+ }
+ }
+
private static OMElement parseFile(File file)
throws FileNotFoundException, XMLStreamException {
InputStream is = new FileInputStream(file);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MultiXMLConfigurationSerializer.java Fri Feb 12 08:57:58 2010
@@ -28,6 +28,8 @@
import org.apache.synapse.eventing.SynapseEventSource;
import org.apache.synapse.Startup;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.config.PriorityExecutorSerializer;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.endpoints.AbstractEndpoint;
import org.apache.synapse.mediators.base.SequenceMediator;
@@ -86,6 +88,7 @@
serializeEventSources(synapseConfig.getEventSources(), definitions);
serializeTasks(synapseConfig.getStartups(), definitions);
serializeLocalRegistryValues(synapseConfig.getLocalRegistry().values(), definitions);
+ serializeExecutors(synapseConfig.getPriorityExecutors().values(), definitions);
// Now serialize the content to synapse.xml
serializeSynapseXML(definitions);
@@ -145,6 +148,12 @@
throw new Exception("Error while creating the directory for tasks : " +
tasksDir.getAbsolutePath());
}
+
+ File executorDir = new File(rootDirectory, MultiXMLConfigurationBuilder.EXECUTORS_DIR);
+ if (!executorDir.exists() && !executorDir.mkdirs()) {
+ throw new Exception("Error while creating the directory for tasks : " +
+ executorDir.getAbsolutePath());
+ }
}
/**
@@ -170,6 +179,7 @@
Collection<SynapseEventSource> eventSources = synapseConfig.getEventSources();
Collection<Startup> tasks = synapseConfig.getStartups();
Collection localEntries = synapseConfig.getLocalRegistry().values();
+ Collection<PriorityExecutor> executors = synapseConfig.getPriorityExecutors().values();
for (ProxyService service : proxyServices) {
if (service.getFileName() == null) {
@@ -217,6 +227,11 @@
}
}
+ for (PriorityExecutor executor : executors) {
+ PriorityExecutorSerializer.serialize(definitions, executor,
+ SynapseConstants.SYNAPSE_NAMESPACE);
+ }
+
serializeSynapseXML(definitions);
}
@@ -488,4 +503,30 @@
}
}
+ private void serializeExecutors(Collection<PriorityExecutor> executors,
+ OMElement parent) throws Exception {
+ for (PriorityExecutor source : executors) {
+ serializeExecutor(source, parent);
+ }
+ }
+
+ private OMElement serializeExecutor(PriorityExecutor source, OMElement parent) throws Exception {
+ File executorDir = new File(rootDirectory, MultiXMLConfigurationBuilder.EXECUTORS_DIR);
+ if (!executorDir.exists() && !executorDir.mkdirs()) {
+ throw new Exception("Error while creating the directory for executors : " +
+ executorDir.getAbsolutePath());
+ }
+
+ OMElement eventDirElem = PriorityExecutorSerializer.serialize(null, source,
+ SynapseConstants.SYNAPSE_NAMESPACE);
+
+ if (source.getFileName() != null) {
+ File eventSrcFile = new File(executorDir, source.getFileName());
+ writeToFile(eventDirElem, eventSrcFile);
+ } else if (parent != null) {
+ parent.addChild(eventDirElem);
+ }
+
+ return eventDirElem;
+ }
}
\ No newline at end of file
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationFactory.java Fri Feb 12 08:57:58 2010
@@ -26,6 +26,8 @@
import org.apache.synapse.Startup;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
import org.apache.synapse.aspects.AspectConfiguration;
import org.apache.synapse.config.Entry;
import org.apache.synapse.config.SynapseConfigUtils;
@@ -37,6 +39,7 @@
import org.apache.synapse.eventing.SynapseEventSource;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.registry.Registry;
+import org.apache.axis2.AxisFault;
import javax.xml.namespace.QName;
import java.util.Iterator;
@@ -86,6 +89,8 @@
defineRegistry(config, elt);
} else if (XMLConfigConstants.EVENT_SOURCE_ELT.equals(elt.getQName())) {
defineEventSource(config, elt);
+ } else if (XMLConfigConstants.EXECUTOR_ELT.equals(elt.getQName())) {
+ defineExecutor(config, elt);
} else if (StartupFinder.getInstance().isStartup(elt.getQName())) {
defineStartup(config, elt);
} else {
@@ -179,6 +184,19 @@
return eventSource;
}
+ public static PriorityExecutor defineExecutor(SynapseConfiguration config,
+ OMElement elem) {
+ PriorityExecutor executor = null;
+ try {
+ executor = PriorityExecutorFactory.createExecutor(
+ XMLConfigConstants.SYNAPSE_NAMESPACE, elem, true);
+ } catch (AxisFault axisFault) {
+ handleException("Failed to create the priority-executor configuration");
+ }
+ config.addPriorityExecutor(executor.getName(), executor);
+ return executor;
+ }
+
private static void handleException(String msg) {
log.error(msg);
throw new SynapseException(msg);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SynapseXMLConfigurationSerializer.java Fri Feb 12 08:57:58 2010
@@ -20,10 +20,12 @@
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.xml.endpoints.EndpointSerializer;
import org.apache.synapse.config.xml.eventing.EventSourceSerializer;
+import org.apache.synapse.commons.executors.config.PriorityExecutorSerializer;
import org.apache.synapse.core.axis2.ProxyService;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.Startup;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.commons.executors.PriorityExecutor;
import org.apache.synapse.eventing.SynapseEventSource;
public class SynapseXMLConfigurationSerializer implements ConfigurationSerializer {
@@ -100,6 +102,8 @@
// handle startups
serializeStartups(definitions, synCfg.getStartups());
+ // Executors
+ serializeExecutors(definitions, synCfg.getPriorityExecutors());
return definitions;
}
@@ -139,6 +143,18 @@
}
}
+ private static void serializeExecutors(OMElement definitions,
+ Map<String, PriorityExecutor> executors) {
+ for (Object o : executors.keySet()) {
+ if (o instanceof String) {
+ String key = (String) o;
+ PriorityExecutor executor = executors.get(key);
+ PriorityExecutorSerializer.serialize(definitions, executor,
+ XMLConfigConstants.SYNAPSE_NAMESPACE);
+ }
+ }
+ }
+
private static void handleException(String msg) {
log.error(msg);
throw new SynapseException(msg);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/XMLConfigConstants.java Fri Feb 12 08:57:58 2010
@@ -65,6 +65,7 @@
public static final String NULL_NAMESPACE = "";
public static final Object QUARTZ_QNAME =
new QName("http://www.opensymphony.com/quartz/JobSchedulingData", "quartz");
+ public static final QName EXECUTOR_ELT = new QName(SYNAPSE_NAMESPACE, "priority-executor");
/** The Trace attribute name, for proxy services, sequences */
public static final String TRACE_ATTRIB_NAME = "trace";
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/executors/SequenceWorker.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.executors;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SequenceWorker implements Runnable {
+
+ private static final Log log = LogFactory.getLog(SequenceWorker.class);
+
+ private MessageContext messageContext = null;
+
+ private Mediator sequence = null;
+
+ public SequenceWorker(MessageContext messageContext,
+ Mediator sequence) {
+ this.messageContext = messageContext;
+ this.sequence = sequence;
+ }
+
+ public void run() {
+ try {
+ sequence.mediate(messageContext);
+ } catch (Throwable e) {
+ String msg = "Exception occurred while trying to execute the sequence ";
+ if (sequence instanceof SequenceMediator) {
+ log.error(msg + ((SequenceMediator)sequence).getName());
+ }
+ }
+ }
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java?rev=909303&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/EnqueueMediator.java Fri Feb 12 08:57:58 2010
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.mediators.builtin;
+
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseLog;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.executors.SequenceWorker;
+
+/**
+ * This mediator execute a given sequence with a given priority.
+ *
+ * It accepts the priority as and argument. The executor used for executing this
+ * sequence should support this priority. If it doesn't support this priority it
+ * executor can throw exceptions.
+ */
+public class EnqueueMediator extends AbstractMediator {
+ private String executorName = null;
+
+ private int priority = 1;
+
+ private String sequenceName = null;
+
+ public boolean mediate(MessageContext synCtx) {
+ SynapseLog log = getLog(synCtx);
+ if (log.isTraceOrDebugEnabled()) {
+ log.traceOrDebug("Start: enqueue mediator");
+ }
+
+ assert executorName != null : "executor name shouldn't be null";
+
+ PriorityExecutor executor = synCtx.getConfiguration().
+ getPriorityExecutors().get(executorName);
+ if (executor == null) {
+ handleException("executor cannot be found for the name : " + executorName, synCtx);
+ return false;
+ }
+
+
+ Mediator m = synCtx.getSequence(sequenceName);
+ if (m != null && m instanceof SequenceMediator) {
+
+ SequenceWorker worker = new SequenceWorker(synCtx, m);
+
+ executor.execute(worker, priority);
+
+ if (log.isTraceOrDebugEnabled()) {
+ log.traceOrDebug("End: enqueue mediator");
+ }
+
+ // with the nio transport, this causes the listener not to write a 202
+ // Accepted response, as this implies that Synapse does not yet know if
+ // a 202 or 200 response would be written back.
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN, "SKIP");
+
+ return true;
+ } else {
+ handleException("Sequence cannot be found : " + sequenceName, synCtx);
+ return false;
+ }
+ }
+
+ public String getExecutorName() {
+ return executorName;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public String getSequenceName() {
+ return sequenceName;
+ }
+
+ public void setExecutorName(String executorName) {
+ this.executorName = executorName;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public void setSequenceName(String sequenceName) {
+ this.sequenceName = sequenceName;
+ }
+}
Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java Fri Feb 12 08:57:58 2010
@@ -19,6 +19,8 @@
package org.apache.synapse.transport.nhttp;
import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
@@ -45,10 +47,20 @@
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams;
+import org.apache.synapse.commons.executors.PriorityExecutor;
+import org.apache.synapse.commons.executors.ExecutorConstants;
+import org.apache.synapse.commons.executors.config.PriorityExecutorFactory;
+import org.apache.synapse.commons.evaluators.Parser;
+import org.apache.synapse.commons.evaluators.EvaluatorException;
+import org.apache.synapse.commons.evaluators.EvaluatorConstants;
import javax.net.ssl.SSLContext;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.namespace.QName;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -98,6 +110,10 @@
private int state = BaseConstants.STOPPED;
/** The ServerHandler */
private ServerHandler handler = null;
+ /** This will execute the requests based on calculate priority */
+ private PriorityExecutor executor = null;
+ /** parser for calculating the priority of incoming messages */
+ private Parser parser = null;
protected IOEventDispatch getEventDispatch(
NHttpServiceHandler handler, SSLContext sslContext,
@@ -182,6 +198,63 @@
mbeanSupport
= new TransportMBeanSupport(this, "nio-http" + (sslContext == null ? "" : "s"));
mbeanSupport.register();
+
+ // create the priority based executor and parser
+ param = transprtIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
+ if (param != null && param.getValue() != null) {
+ createPriorityConfiguration(param.getValue().toString());
+ }
+ }
+
+ private void createPriorityConfiguration(String fileName) throws AxisFault {
+ OMElement definitions = null;
+ try {
+ FileInputStream fis = new FileInputStream(fileName);
+ definitions = new StAXOMBuilder(fis).getDocumentElement();
+ definitions.build();
+ } catch (FileNotFoundException e) {
+ handleException("Priority configuration file cannot be found : " + fileName, e);
+ } catch (XMLStreamException e) {
+ handleException("Error parsing priority configuration xml file " + fileName, e);
+ }
+
+ assert definitions != null;
+
+ OMElement executorElem = definitions.getFirstChildWithName(
+ new QName(ExecutorConstants.PRIORITY_EXECUTOR));
+
+ if (executorElem == null) {
+ handleException(ExecutorConstants.PRIORITY_EXECUTOR +
+ " configuration is mandatory for priority based routing");
+ }
+
+ executor = PriorityExecutorFactory.createExecutor(null, executorElem, false);
+ OMElement conditionsElem = definitions.getFirstChildWithName(
+ new QName(EvaluatorConstants.CONDITIONS));
+ if (conditionsElem == null) {
+ handleException("Conditions configuration is mandatory for priority based routing");
+ }
+
+ executor.init();
+
+ assert conditionsElem != null;
+ OMAttribute defPriorityAttr = conditionsElem.getAttribute(
+ new QName(EvaluatorConstants.DEFAULT_PRIORITY));
+ if (defPriorityAttr != null) {
+ parser = new Parser(Integer.parseInt(defPriorityAttr.getAttributeValue()));
+ } else {
+ parser = new Parser();
+ }
+
+ try {
+ parser.init(conditionsElem);
+ } catch (EvaluatorException e) {
+ handleException("Invalid " + EvaluatorConstants.CONDITIONS +
+ " configuration for priority based mediation", e);
+ }
+
+ log.info("Created a priority based executor from the configuration: " +
+ fileName);
}
/**
@@ -272,7 +345,7 @@
addToServiceURIMap((AxisService) obj);
}
- handler = new ServerHandler(cfgCtx, params, sslContext != null, metrics);
+ handler = new ServerHandler(cfgCtx, params, sslContext != null, metrics, parser, executor);
final IOEventDispatch ioEventDispatch = getEventDispatch(handler,
sslContext, sslIOSessionHandler, params);
state = BaseConstants.STARTED;
@@ -549,6 +622,11 @@
throw new AxisFault(msg, e);
}
+ private void handleException(String msg) throws AxisFault {
+ log.error(msg);
+ throw new AxisFault(msg);
+ }
+
// -- jmx/management methods--
public long getMessagesReceived() {
if (metrics != null) {
Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java Fri Feb 12 08:57:58 2010
@@ -85,4 +85,5 @@
public static final String HTTP_REQ_METHOD = "HTTP_REQ_METHOD";
public static final String NO_ENTITY_BODY = "NO_ENTITY_BODY";
public static final String ENDPOINT_PREFIX = "ENDPOINT_PREFIX";
+ protected static final String PRIORITY_CONFIG_FILE_NAME = "priorityConfigFile";
}
Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=909303&r1=909302&r2=909303&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java Fri Feb 12 08:57:58 2010
@@ -44,10 +44,15 @@
import org.apache.http.util.EncodingUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.commons.evaluators.Parser;
+import org.apache.synapse.commons.evaluators.EvaluatorContext;
+import org.apache.synapse.commons.executors.PriorityExecutor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
+import java.util.HashMap;
/**
* The server connection handler. An instance of this class is used by each IOReactor, to
@@ -81,11 +86,16 @@
/** the metrics collector */
private MetricsCollector metrics = null;
+ private Parser parser = null;
+
+ private PriorityExecutor executor = null;
+
public static final String REQUEST_SINK_BUFFER = "synapse.request-sink-buffer";
public static final String RESPONSE_SOURCE_BUFFER = "synapse.response-source-buffer";
public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams params,
- final boolean isHttps, final MetricsCollector metrics) {
+ final boolean isHttps, final MetricsCollector metrics,
+ Parser parser, PriorityExecutor executor) {
super();
this.cfgCtx = cfgCtx;
this.params = params;
@@ -97,12 +107,17 @@
this.allocator = new HeapByteBufferAllocator();
this.cfg = NHttpConfiguration.getInstance();
- this.workerPool = WorkerPoolFactory.getWorkerPool(
- cfg.getServerCoreThreads(),
- cfg.getServerMaxThreads(),
- cfg.getServerKeepalive(),
- cfg.getServerQueueLen(),
- "Server Worker thread group", "HttpServerWorker");
+ if (executor == null) {
+ this.workerPool = WorkerPoolFactory.getWorkerPool(
+ cfg.getServerCoreThreads(),
+ cfg.getServerMaxThreads(),
+ cfg.getServerKeepalive(),
+ cfg.getServerQueueLen(),
+ "Server Worker thread group", "HttpServerWorker");
+ } else {
+ this.executor = executor;
+ this.parser = parser;
+ }
}
/**
@@ -148,9 +163,22 @@
response.setEntity(entity);
// hand off processing of the request to a thread off the pool
- workerPool.execute(
- new ServerWorker(cfgCtx, conn, isHttps, metrics, this,
- request, is, response, os));
+ ServerWorker worker = new ServerWorker(cfgCtx, conn, isHttps, metrics, this,
+ request, is, response, os);
+
+ if (workerPool != null) {
+ workerPool.execute(worker);
+ } else if (executor != null) {
+ Map<String, String> headers = new HashMap<String, String>();
+ for (Header header : request.getAllHeaders()) {
+ headers.put(header.getName(), header.getValue());
+ }
+
+ EvaluatorContext evaluatorContext =
+ new EvaluatorContext(request.getRequestLine().getUri(), headers);
+ int priority = parser.parse(evaluatorContext);
+ executor.execute(worker, priority);
+ }
} catch (Exception e) {
if (metrics != null) {
@@ -442,7 +470,11 @@
public void stop() {
try {
- workerPool.shutdown(1000);
+ if (workerPool != null) {
+ workerPool.shutdown(1000);
+ } else if (executor != null) {
+ executor.destroy();
+ }
} catch (InterruptedException ignore) {}
}
}