You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by in...@apache.org on 2009/08/09 13:10:02 UTC
svn commit: r802517 - in
/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators:
./ seda/
Author: indika
Date: Sun Aug 9 11:10:02 2009
New Revision: 802517
URL: http://svn.apache.org/viewvc?rev=802517&view=rev
Log:
add an experimental SEDA mediator's initial code. no log or java doc - Will add when experiment is going
Added:
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java
synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/DefaultConsumer.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ *
+ */
+public class DefaultConsumer implements SEDAQueueConsumer {
+
+ private static final Executor EXECUTOR
+ = Executors.newFixedThreadPool(10);
+
+ private Mediator mediator;
+
+ public DefaultConsumer(Mediator mediator) {
+ this.mediator = mediator;
+ }
+
+ public void consume(MessageContext messageContext) {
+ EXECUTOR.execute(new Worker(mediator, messageContext));
+ }
+
+ static class Worker implements Runnable {
+
+ private final Mediator mediator;
+ private final MessageContext messageContext;
+
+ private volatile Exception exception;
+
+ public Worker(final Mediator mediator, final MessageContext messageContext) {
+ super();
+ this.mediator = mediator;
+ this.messageContext = messageContext;
+ }
+
+ public void run() {
+ try {
+ mediator.mediate(messageContext);
+ } catch (Exception ex) {
+ this.exception = ex;
+ }
+ }
+
+ public Exception getException() {
+ return this.exception;
+ }
+
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediator.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.mediators.AbstractMediator;
+
+/**
+ *
+ */
+public class SEDAMediator extends AbstractMediator implements ManagedLifecycle {
+
+ private SEDAQueueConsumerPolicy sedaQueueConsumerPolicy;
+ private SEDAQueueProducerPolicy sedaQueueProducerPolicy;
+ private SEDAQueuePolicy sedaQueuePolicy;
+ private SEDAQueue sedaQueue;
+ private SEDAQueueProducer sedaQueueProducer;
+ private boolean initialized = false;
+ // A key of the mediator that do work after SEDA component - actual consumer
+ private String consumer;
+
+ public boolean mediate(MessageContext synCtx) {
+ if (initialized) {
+ sedaQueueProducer.produce(synCtx);
+ }
+ return false;
+ }
+
+ public void init(SynapseEnvironment se) {
+ Mediator mediator = se.getSynapseConfiguration().getSequence(consumer);
+ sedaQueue = new SEDAQueue(sedaQueuePolicy,
+ new SEDAQueueConsumerWorkerFactory(sedaQueueConsumerPolicy,
+ new DefaultConsumer(mediator)));
+ sedaQueueProducer = new SEDAQueueProducer(sedaQueueProducerPolicy,
+ sedaQueue);
+ sedaQueue.init();
+ initialized = true;
+ }
+
+ public void destroy() {
+ sedaQueue.destory();
+ initialized = false;
+ }
+
+ public SEDAQueueConsumerPolicy getSedaQueueConsumerPolicy() {
+ return sedaQueueConsumerPolicy;
+ }
+
+ public void setSedaQueueConsumerPolicy(SEDAQueueConsumerPolicy sedaQueueConsumerPolicy) {
+ this.sedaQueueConsumerPolicy = sedaQueueConsumerPolicy;
+ }
+
+ public SEDAQueueProducerPolicy getSedaQueueProducerPolicy() {
+ return sedaQueueProducerPolicy;
+ }
+
+ public void setSedaQueueProducerPolicy(SEDAQueueProducerPolicy sedaQueueProducerPolicy) {
+ this.sedaQueueProducerPolicy = sedaQueueProducerPolicy;
+ }
+
+ public SEDAQueuePolicy getSedaQueuePolicy() {
+ return sedaQueuePolicy;
+ }
+
+ public void setSedaQueuePolicy(SEDAQueuePolicy sedaQueuePolicy) {
+ this.sedaQueuePolicy = sedaQueuePolicy;
+ }
+
+ public String getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(String consumer) {
+ this.consumer = consumer;
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAMediatorFactory.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.AbstractMediatorFactory;
+
+import javax.xml.namespace.QName;
+
+/**
+ *
+ */
+public class SEDAMediatorFactory extends AbstractMediatorFactory {
+
+ private static final QName SEDA_Q =
+ new QName(SynapseConstants.SYNAPSE_NAMESPACE, "seda");
+
+ public Mediator createMediator(OMElement elem) {
+
+ final SEDAMediator mediator = new SEDAMediator();
+ String mediatorKey = elem.getAttributeValue(ATT_KEY);
+
+ if (mediatorKey != null && !"".equals(mediatorKey.trim())) {
+ mediator.setConsumer(mediatorKey.trim());
+ }
+ //TODO parse OM element and creates policies
+ mediator.setSedaQueueConsumerPolicy(new SEDAQueueConsumerPolicy());
+ mediator.setSedaQueueProducerPolicy(new SEDAQueueProducerPolicy());
+ mediator.setSedaQueuePolicy(new SEDAQueuePolicy());
+ return mediator;
+ }
+
+ public QName getTagQName() {
+ return SEDA_Q;
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueue.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.MessageContext;
+
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class SEDAQueue {
+
+ private SEDAQueuePolicy queuePolicy;
+ private final BlockingQueue<MessageContext> queue;
+ private SEDAQueueConsumerWorkerFactory workerFactory;
+ private boolean initialized;
+ private static final Executor EXECUTOR
+ = Executors.newFixedThreadPool(10);
+
+ public SEDAQueue(SEDAQueuePolicy queuePolicy, SEDAQueueConsumerWorkerFactory workerFactory) {
+ this.queuePolicy = queuePolicy;
+ this.queue = createBlockingQueue(queuePolicy);
+ this.workerFactory = workerFactory;
+ }
+
+ private BlockingQueue<MessageContext> createBlockingQueue(SEDAQueuePolicy queuePolicy) {
+ BlockingQueue<MessageContext> queue;
+ String queueType = queuePolicy.getQueueType();
+ int capacity = queuePolicy.getQueueSize();
+ if (SEDAQueuePolicy.QUEUE_TYPE_PRIORITY_BLOCKING.equals(queueType)) {
+ queue = new PriorityBlockingQueue<MessageContext>(capacity);
+ } else if (SEDAQueuePolicy.QUEUE_TYPE_SYNCHRONOUS.equals(queueType)) {
+ queue = new SynchronousQueue<MessageContext>();
+ } else {
+ queue = new LinkedBlockingQueue<MessageContext>(capacity);
+ }
+ return queue;
+ }
+
+ public void init() {
+ EXECUTOR.execute(workerFactory.createSEDAQueueConsumerWorker(this));
+ }
+
+ public void destory() {
+ }
+
+ public BlockingQueue<MessageContext> getQueue() {
+ return queue;
+ }
+
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ public void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumer.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.synapse.MessageContext;
+
+/**
+ *
+ */
+public interface SEDAQueueConsumer {
+
+ public void consume(MessageContext messageContext);
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerPolicy.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+/**
+ *
+ */
+public class SEDAQueueConsumerPolicy {
+
+ public static final String TAKE = "take";
+ public static final String POLL = "poll";
+ public static final String DRAINALL = "drainAll";
+ public static final String DRAIN = "drain";
+ private long timeoutOnPoll = 1000;
+ private int maxElementsOnPoll = 5;
+ private String action = POLL;
+
+ public long getTimeoutOnPoll() {
+ return timeoutOnPoll;
+ }
+
+ public void setTimeoutOnPoll(long timeoutOnPoll) {
+ this.timeoutOnPoll = timeoutOnPoll;
+ }
+
+ public int getMaxElementsOnPoll() {
+ return maxElementsOnPoll;
+ }
+
+ public void setMaxElementsOnPoll(int maxElementsOnPoll) {
+ this.maxElementsOnPoll = maxElementsOnPoll;
+ }
+
+ public String getAction() {
+ return action;
+ }
+
+ public void setAction(String action) {
+ this.action = action;
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorker.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class SEDAQueueConsumerWorker implements Runnable {
+
+ private static final Log log = LogFactory.getLog(SEDAQueueConsumerWorker.class);
+ private final SEDAQueueConsumerPolicy queueConsumerPolicy;
+ private final SEDAQueue sedaQueue;
+ private final BlockingQueue<MessageContext> queue;
+ private final SEDAQueueConsumer sedaQueueConsumer;
+
+ public SEDAQueueConsumerWorker(SEDAQueue sedaQueue,
+ SEDAQueueConsumerPolicy queueConsumerPolicy,
+ SEDAQueueConsumer sedaQueueConsumer) {
+ this.sedaQueue = sedaQueue;
+ this.queueConsumerPolicy = queueConsumerPolicy;
+ this.queue = sedaQueue.getQueue();
+ this.sedaQueueConsumer = sedaQueueConsumer;
+ }
+
+ public void run() {
+ String action = queueConsumerPolicy.getAction();
+ if (SEDAQueueConsumerPolicy.TAKE.equals(action)) {
+ try {
+ consume(queue.take());
+ } catch (InterruptedException ignored) {
+ log.debug("Ignored InterruptedException when ocuured calling queue.take()");
+ }
+ } else if (SEDAQueueConsumerPolicy.POLL.equals(action)) {
+ long timeout = queueConsumerPolicy.getTimeoutOnPoll();
+ if (timeout < 0) {
+ consume(queue.poll());
+ } else {
+ try {
+ consume(queue.poll(timeout, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException ignored) {
+ log.debug("Ignored InterruptedException when ocuured calling queue.poll()");
+ }
+ }
+ } else if (SEDAQueueConsumerPolicy.DRAIN.equals(action)) {
+ int maxElements = queueConsumerPolicy.getMaxElementsOnPoll();
+ Collection<MessageContext> contexts = new ArrayList<MessageContext>(maxElements);
+ queue.drainTo(contexts, maxElements);
+ consume(contexts);
+ } else if (SEDAQueueConsumerPolicy.DRAINALL.equals(action)) {
+ Collection<MessageContext> contexts = new ArrayList<MessageContext>();
+ queue.drainTo(contexts);
+ consume(contexts);
+ }
+ }
+
+ private void consume(MessageContext context) {
+ if (context != null) {
+ sedaQueueConsumer.consume(context);
+ }
+ }
+
+ private void consume(Collection<MessageContext> contexts) {
+ for (MessageContext context : contexts) {
+ consume(context);
+ }
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueConsumerWorkerFactory.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+/**
+ *
+ */
+public class SEDAQueueConsumerWorkerFactory {
+
+ private final SEDAQueueConsumerPolicy queueConsumerPolicy;
+ private final SEDAQueueConsumer sedaQueueConsumer;
+
+ public SEDAQueueConsumerWorkerFactory(SEDAQueueConsumerPolicy queueConsumerPolicy,
+ SEDAQueueConsumer sedaQueueConsumer) {
+ this.queueConsumerPolicy = queueConsumerPolicy;
+ this.sedaQueueConsumer = sedaQueueConsumer;
+ }
+
+ public SEDAQueueConsumerWorker createSEDAQueueConsumerWorker(SEDAQueue sedaQueue) {
+ return new SEDAQueueConsumerWorker(sedaQueue, queueConsumerPolicy, sedaQueueConsumer);
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueuePolicy.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+
+public class SEDAQueuePolicy {
+
+ public static final String QUEUE_TYPE_LINKED_BLOCKING = "LinkedBlocking";
+ public static final String QUEUE_TYPE_PRIORITY_BLOCKING = "PriorityBlocking";
+ public static final String QUEUE_TYPE_SYNCHRONOUS = "Synchronous";
+ private int queueSize;
+ private int queueWorkers;
+ private String queueType;
+ private final Map<String, String> properties = new HashMap<String, String>();
+
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public void setQueueSize(int queueSize) {
+ this.queueSize = queueSize;
+ }
+
+ public int getQueueWorkers() {
+ return queueWorkers;
+ }
+
+ public void setQueueWorkers(int queueWorkers) {
+ this.queueWorkers = queueWorkers;
+ }
+
+ public String getQueueType() {
+ return queueType;
+ }
+
+ public void setQueueType(String queueType) {
+ this.queueType = queueType;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public void addProperties(String name, String value) {
+ this.properties.put(name, value);
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducer.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class SEDAQueueProducer {
+
+ private static final Log log = LogFactory.getLog(SEDAQueueProducer.class);
+ private final SEDAQueueProducerPolicy queueProducerPolicy;
+ private final SEDAQueue sedaQueue;
+ private final BlockingQueue<MessageContext> queue;
+
+ public SEDAQueueProducer(SEDAQueueProducerPolicy queueProducerPolicy, SEDAQueue sedaQueue) {
+ this.queueProducerPolicy = queueProducerPolicy;
+ this.sedaQueue = sedaQueue;
+ this.queue = sedaQueue.getQueue();
+ }
+
+ public void produce(MessageContext messageContext) {
+ String action = queueProducerPolicy.getAction();
+ if (SEDAQueueProducerPolicy.ADD.equals(action)) {
+ queue.add(messageContext);
+ } else if (SEDAQueueProducerPolicy.OFFER.equals(action)) {
+ long timeout = queueProducerPolicy.getTimeoutOnInsert();
+ if (timeout < 0) {
+ queue.offer(messageContext);
+ } else {
+ try {
+ queue.offer(messageContext, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.debug("Ignored InterruptedException when ocuured calling queue.offer");
+ }
+ }
+ } else if (SEDAQueueProducerPolicy.PUT.equals(action)) {
+ try {
+ queue.put(messageContext);
+ } catch (InterruptedException e) {
+ log.debug("Ignored InterruptedException when ocuured calling queue.put");
+ }
+ }
+ }
+}
Added: synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java?rev=802517&view=auto
==============================================================================
--- synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java (added)
+++ synapse/trunk/java/modules/experimental/src/main/java/org/apache/synapse/experimental/mediators/seda/SEDAQueueProducerPolicy.java Sun Aug 9 11:10:02 2009
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.experimental.mediators.seda;
+
+/**
+ *
+ */
+public class SEDAQueueProducerPolicy {
+
+ public static final String ADD = "add";
+ public static final String OFFER = "offer";
+ public static final String PUT = "put";
+ private long timeoutOnInsert = -1;
+ private String action = OFFER;
+
+ public long getTimeoutOnInsert() {
+ return timeoutOnInsert;
+ }
+
+ public void setTimeoutOnInsert(long timeoutOnInsert) {
+ this.timeoutOnInsert = timeoutOnInsert;
+ }
+
+ public String getAction() {
+ return action;
+ }
+
+ public void setAction(String action) {
+ this.action = action;
+ }
+}