You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by in...@apache.org on 2009/08/09 13:10:02 UTC

svn commit: r802517 - in /synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators: ./ seda/

Author: indika
Date: Sun Aug  9 11:10:02 2009
New Revision: 802517

URL: http://svn.apache.org/viewvc?rev=802517&view=rev
Log:
add an experimental SEDA mediator's initial code.  no log or java doc - Will add when experiment is going

Added:
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java
    synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ *
+ */
+public class DefaultConsumer implements SEDAQueueConsumer {
+
+    private static final Executor EXECUTOR
+            = Executors.newFixedThreadPool(10);
+
+    private Mediator mediator;
+
+    public DefaultConsumer(Mediator mediator) {
+        this.mediator = mediator;
+    }
+
+    public void consume(MessageContext messageContext) {
+        EXECUTOR.execute(new Worker(mediator, messageContext));
+    }
+
+    static class Worker implements Runnable {
+
+        private final Mediator mediator;
+        private final MessageContext messageContext;
+
+        private volatile Exception exception;
+
+        public Worker(final Mediator mediator, final MessageContext messageContext) {
+            super();
+            this.mediator = mediator;
+            this.messageContext = messageContext;
+        }
+
+        public void run() {
+            try {
+                mediator.mediate(messageContext);
+            } catch (Exception ex) {
+                this.exception = ex;
+            }
+        }
+
+        public Exception getException() {
+            return this.exception;
+        }
+
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,95 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.mediators.AbstractMediator;
+
+/**
+ *
+ */
+public class SEDAMediator extends AbstractMediator implements ManagedLifecycle {
+
+    private SEDAQueueConsumerPolicy sedaQueueConsumerPolicy;
+    private SEDAQueueProducerPolicy sedaQueueProducerPolicy;
+    private SEDAQueuePolicy sedaQueuePolicy;
+    private SEDAQueue sedaQueue;
+    private SEDAQueueProducer sedaQueueProducer;
+    private boolean initialized = false;
+    // A key of the mediator that do work after SEDA component - actual consumer
+    private String consumer;
+
+    public boolean mediate(MessageContext synCtx) {
+        if (initialized) {
+            sedaQueueProducer.produce(synCtx);
+        }
+        return false;
+    }
+
+    public void init(SynapseEnvironment se) {
+        Mediator mediator = se.getSynapseConfiguration().getSequence(consumer);
+        sedaQueue = new SEDAQueue(sedaQueuePolicy,
+                new SEDAQueueConsumerWorkerFactory(sedaQueueConsumerPolicy,
+                        new DefaultConsumer(mediator)));
+        sedaQueueProducer = new SEDAQueueProducer(sedaQueueProducerPolicy,
+                sedaQueue);
+        sedaQueue.init();
+        initialized = true;
+    }
+
+    public void destroy() {
+        sedaQueue.destory();
+        initialized = false;
+    }
+
+    public SEDAQueueConsumerPolicy getSedaQueueConsumerPolicy() {
+        return sedaQueueConsumerPolicy;
+    }
+
+    public void setSedaQueueConsumerPolicy(SEDAQueueConsumerPolicy sedaQueueConsumerPolicy) {
+        this.sedaQueueConsumerPolicy = sedaQueueConsumerPolicy;
+    }
+
+    public SEDAQueueProducerPolicy getSedaQueueProducerPolicy() {
+        return sedaQueueProducerPolicy;
+    }
+
+    public void setSedaQueueProducerPolicy(SEDAQueueProducerPolicy sedaQueueProducerPolicy) {
+        this.sedaQueueProducerPolicy = sedaQueueProducerPolicy;
+    }
+
+    public SEDAQueuePolicy getSedaQueuePolicy() {
+        return sedaQueuePolicy;
+    }
+
+    public void setSedaQueuePolicy(SEDAQueuePolicy sedaQueuePolicy) {
+        this.sedaQueuePolicy = sedaQueuePolicy;
+    }
+
+    public String getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(String consumer) {
+        this.consumer = consumer;
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.AbstractMediatorFactory;
+
+import javax.xml.namespace.QName;
+
+/**
+ *
+ */
+public class SEDAMediatorFactory extends AbstractMediatorFactory {
+
+    private static final QName SEDA_Q =
+            new QName(SynapseConstants.SYNAPSE_NAMESPACE, "seda");
+
+    public Mediator createMediator(OMElement elem) {
+
+        final SEDAMediator mediator = new SEDAMediator();
+        String mediatorKey = elem.getAttributeValue(ATT_KEY);
+
+        if (mediatorKey != null && !"".equals(mediatorKey.trim())) {
+            mediator.setConsumer(mediatorKey.trim());
+        }
+        //TODO parse OM element and creates policies
+        mediator.setSedaQueueConsumerPolicy(new SEDAQueueConsumerPolicy());
+        mediator.setSedaQueueProducerPolicy(new SEDAQueueProducerPolicy());
+        mediator.setSedaQueuePolicy(new SEDAQueuePolicy());
+        return mediator;
+    }
+
+    public QName getTagQName() {
+        return SEDA_Q;
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,75 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.MessageContext;
+
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class SEDAQueue {
+
+    private SEDAQueuePolicy queuePolicy;
+    private final BlockingQueue<MessageContext> queue;
+    private SEDAQueueConsumerWorkerFactory workerFactory;
+    private boolean initialized;
+    private static final Executor EXECUTOR
+            = Executors.newFixedThreadPool(10);
+
+    public SEDAQueue(SEDAQueuePolicy queuePolicy, SEDAQueueConsumerWorkerFactory workerFactory) {
+        this.queuePolicy = queuePolicy;
+        this.queue = createBlockingQueue(queuePolicy);
+        this.workerFactory = workerFactory;
+    }
+
+    private BlockingQueue<MessageContext> createBlockingQueue(SEDAQueuePolicy queuePolicy) {
+        BlockingQueue<MessageContext> queue;
+        String queueType = queuePolicy.getQueueType();
+        int capacity = queuePolicy.getQueueSize();
+        if (SEDAQueuePolicy.QUEUE_TYPE_PRIORITY_BLOCKING.equals(queueType)) {
+            queue = new PriorityBlockingQueue<MessageContext>(capacity);
+        } else if (SEDAQueuePolicy.QUEUE_TYPE_SYNCHRONOUS.equals(queueType)) {
+            queue = new SynchronousQueue<MessageContext>();
+        } else {
+            queue = new LinkedBlockingQueue<MessageContext>(capacity);
+        }
+        return queue;
+    }
+
+    public void init() {
+        EXECUTOR.execute(workerFactory.createSEDAQueueConsumerWorker(this));
+    }
+
+    public void destory() {
+    }
+
+    public BlockingQueue<MessageContext> getQueue() {
+        return queue;
+    }
+
+    public boolean isInitialized() {
+        return initialized;
+    }
+
+    public void setInitialized(boolean initialized) {
+        this.initialized = initialized;
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.MessageContext;
+
+/**
+ *
+ */
+public interface SEDAQueueConsumer {
+
+    public void consume(MessageContext messageContext);
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+/**
+ *
+ */
+public class SEDAQueueConsumerPolicy {
+
+    public static final String TAKE = "take";
+    public static final String POLL = "poll";
+    public static final String DRAINALL = "drainAll";
+    public static final String DRAIN = "drain";
+    private long timeoutOnPoll = 1000;
+    private int maxElementsOnPoll = 5;
+    private String action = POLL;
+
+    public long getTimeoutOnPoll() {
+        return timeoutOnPoll;
+    }
+
+    public void setTimeoutOnPoll(long timeoutOnPoll) {
+        this.timeoutOnPoll = timeoutOnPoll;
+    }
+
+    public int getMaxElementsOnPoll() {
+        return maxElementsOnPoll;
+    }
+
+    public void setMaxElementsOnPoll(int maxElementsOnPoll) {
+        this.maxElementsOnPoll = maxElementsOnPoll;
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public void setAction(String action) {
+        this.action = action;
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class SEDAQueueConsumerWorker implements Runnable {
+
+    private static final Log log = LogFactory.getLog(SEDAQueueConsumerWorker.class);
+    private final SEDAQueueConsumerPolicy queueConsumerPolicy;
+    private final SEDAQueue sedaQueue;
+    private final BlockingQueue<MessageContext> queue;
+    private final SEDAQueueConsumer sedaQueueConsumer;
+
+    public SEDAQueueConsumerWorker(SEDAQueue sedaQueue,
+                                   SEDAQueueConsumerPolicy queueConsumerPolicy,
+                                   SEDAQueueConsumer sedaQueueConsumer) {
+        this.sedaQueue = sedaQueue;
+        this.queueConsumerPolicy = queueConsumerPolicy;
+        this.queue = sedaQueue.getQueue();
+        this.sedaQueueConsumer = sedaQueueConsumer;
+    }
+
+    public void run() {
+        String action = queueConsumerPolicy.getAction();
+        if (SEDAQueueConsumerPolicy.TAKE.equals(action)) {
+            try {
+                consume(queue.take());
+            } catch (InterruptedException ignored) {
+                log.debug("Ignored InterruptedException when ocuured calling queue.take()");
+            }
+        } else if (SEDAQueueConsumerPolicy.POLL.equals(action)) {
+            long timeout = queueConsumerPolicy.getTimeoutOnPoll();
+            if (timeout < 0) {
+                consume(queue.poll());
+            } else {
+                try {
+                    consume(queue.poll(timeout, TimeUnit.MILLISECONDS));
+                } catch (InterruptedException ignored) {
+                    log.debug("Ignored InterruptedException when ocuured calling queue.poll()");
+                }
+            }
+        } else if (SEDAQueueConsumerPolicy.DRAIN.equals(action)) {
+            int maxElements = queueConsumerPolicy.getMaxElementsOnPoll();
+            Collection<MessageContext> contexts = new ArrayList<MessageContext>(maxElements);
+            queue.drainTo(contexts, maxElements);
+            consume(contexts);
+        } else if (SEDAQueueConsumerPolicy.DRAINALL.equals(action)) {
+            Collection<MessageContext> contexts = new ArrayList<MessageContext>();
+            queue.drainTo(contexts);
+            consume(contexts);
+        }
+    }
+
+    private void consume(MessageContext context) {
+        if (context != null) {
+            sedaQueueConsumer.consume(context);
+        }
+    }
+
+    private void consume(Collection<MessageContext> contexts) {
+        for (MessageContext context : contexts) {
+            consume(context);
+        }
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+/**
+ *
+ */
+public class SEDAQueueConsumerWorkerFactory {
+
+    private final SEDAQueueConsumerPolicy queueConsumerPolicy;
+    private final SEDAQueueConsumer sedaQueueConsumer;
+
+    public SEDAQueueConsumerWorkerFactory(SEDAQueueConsumerPolicy queueConsumerPolicy,
+                                          SEDAQueueConsumer sedaQueueConsumer) {
+        this.queueConsumerPolicy = queueConsumerPolicy;
+        this.sedaQueueConsumer = sedaQueueConsumer;
+    }
+
+    public SEDAQueueConsumerWorker createSEDAQueueConsumerWorker(SEDAQueue sedaQueue) {
+        return new SEDAQueueConsumerWorker(sedaQueue, queueConsumerPolicy, sedaQueueConsumer);
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,69 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+
+public class SEDAQueuePolicy {
+
+    public static final String QUEUE_TYPE_LINKED_BLOCKING = "LinkedBlocking";
+    public static final String QUEUE_TYPE_PRIORITY_BLOCKING = "PriorityBlocking";
+    public static final String QUEUE_TYPE_SYNCHRONOUS = "Synchronous";
+    private int queueSize;
+    private int queueWorkers;
+    private String queueType;
+    private final Map<String, String> properties = new HashMap<String, String>();
+
+    public int getQueueSize() {
+        return queueSize;
+    }
+
+    public void setQueueSize(int queueSize) {
+        this.queueSize = queueSize;
+    }
+
+    public int getQueueWorkers() {
+        return queueWorkers;
+    }
+
+    public void setQueueWorkers(int queueWorkers) {
+        this.queueWorkers = queueWorkers;
+    }
+
+    public String getQueueType() {
+        return queueType;
+    }
+
+    public void setQueueType(String queueType) {
+        this.queueType = queueType;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void addProperties(String name, String value) {
+        this.properties.put(name, value);
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,67 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class SEDAQueueProducer {
+
+    private static final Log log = LogFactory.getLog(SEDAQueueProducer.class);
+    private final SEDAQueueProducerPolicy queueProducerPolicy;
+    private final SEDAQueue sedaQueue;
+    private final BlockingQueue<MessageContext> queue;
+
+    public SEDAQueueProducer(SEDAQueueProducerPolicy queueProducerPolicy, SEDAQueue sedaQueue) {
+        this.queueProducerPolicy = queueProducerPolicy;
+        this.sedaQueue = sedaQueue;
+        this.queue = sedaQueue.getQueue();
+    }
+
+    public void produce(MessageContext messageContext) {
+        String action = queueProducerPolicy.getAction();
+        if (SEDAQueueProducerPolicy.ADD.equals(action)) {
+            queue.add(messageContext);
+        } else if (SEDAQueueProducerPolicy.OFFER.equals(action)) {
+            long timeout = queueProducerPolicy.getTimeoutOnInsert();
+            if (timeout < 0) {
+                queue.offer(messageContext);
+            } else {
+                try {
+                    queue.offer(messageContext, timeout, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    log.debug("Ignored InterruptedException when ocuured calling queue.offer");
+                }
+            }
+        } else if (SEDAQueueProducerPolicy.PUT.equals(action)) {
+            try {
+                queue.put(messageContext);
+            } catch (InterruptedException e) {
+                log.debug("Ignored InterruptedException when ocuured calling queue.put");
+            }
+        }
+    }
+}

Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java Sun Aug  9 11:10:02 2009
@@ -0,0 +1,47 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+/**
+ *
+ */
+public class SEDAQueueProducerPolicy {
+
+    public static final String ADD = "add";
+    public static final String OFFER = "offer";
+    public static final String PUT = "put";
+    private long timeoutOnInsert = -1;
+    private String action = OFFER;
+
+    public long getTimeoutOnInsert() {
+        return timeoutOnInsert;
+    }
+
+    public void setTimeoutOnInsert(long timeoutOnInsert) {
+        this.timeoutOnInsert = timeoutOnInsert;
+    }
+
+    public String getAction() {
+        return action;
+    }
+
+    public void setAction(String action) {
+        this.action = action;
+    }
+}