You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2010/05/12 19:24:47 UTC

svn commit: r943590 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse: config/xml/ mediators/eip/sample/

Author: ruwan
Date: Wed May 12 17:24:47 2010
New Revision: 943590

URL: http://svn.apache.org/viewvc?rev=943590&view=rev
Log:
Adding the sampling throttle mediator

Added:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java
Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=943590&r1=943589&r2=943590&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java Wed May 12 17:24:47 2010
@@ -75,7 +75,8 @@ public class MediatorFactoryFinder imple
         EventPublisherMediatorFactory.class,
         TransactionMediatorFactory.class,
         EnqueueMediatorFactory.class,
-        ConditionalRouterMediatorFactory.class
+        ConditionalRouterMediatorFactory.class,
+        SamplingThrottleMediatorFactory.class
     };
 
     private final static MediatorFactoryFinder instance  = new MediatorFactoryFinder();

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java?rev=943590&r1=943589&r2=943590&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java Wed May 12 17:24:47 2010
@@ -62,7 +62,8 @@ public class MediatorSerializerFinder {
         EventPublisherMediatorSerializer.class,
         TransactionMediatorSerializer.class,
         EnqueueMediatorSerializer.class,
-        ConditionalRouterMediatorSerializer.class
+        ConditionalRouterMediatorSerializer.class,
+        SamplingThrottleMediatorSerializer.class
     };
 
     private final static MediatorSerializerFinder instance = new MediatorSerializerFinder();

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java Wed May 12 17:24:47 2010
@@ -0,0 +1,127 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.eip.Target;
+import org.apache.synapse.mediators.eip.sample.MessageQueue;
+import org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator;
+
+import javax.xml.namespace.QName;
+
+/**
+ * Builds the {@link org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator} instance by looking at the
+ * following configuration</p>
+ *
+ * <pre>&lt;sampler id="string" rate="int" unitTime="long"&gt;
+ *   &lt;messageQueue class="string"/&gt;
+ *   &lt;target .../&gt;
+ * &lt;sampler/&gt;
+ * </pre>
+ *
+ * @see org.apache.synapse.config.xml.AbstractMediatorFactory
+ */
+public class SamplingThrottleMediatorFactory extends AbstractMediatorFactory {
+
+    private static final QName SAMPLER_Q
+            = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sampler");
+    private static final QName ID_ATTR
+            = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+    private static final QName RATE_ATTR
+            = new QName(XMLConfigConstants.NULL_NAMESPACE, "rate");
+    private static final QName UNIT_TIME_ATTR
+            = new QName(XMLConfigConstants.NULL_NAMESPACE, "unitTime");
+    private static final QName MESSAGE_QUEUE_Q
+            = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "messageQueue");
+    private static final QName CLASS_ATTR
+            = new QName(XMLConfigConstants.NULL_NAMESPACE, "class");
+
+    public Mediator createMediator(OMElement omElement) {
+
+        SamplingThrottleMediator samplingThrottleMediator = new SamplingThrottleMediator();
+        processAuditStatus(samplingThrottleMediator, omElement);
+
+        OMAttribute idAttribute = omElement.getAttribute(ID_ATTR);
+        if (idAttribute != null) {
+            samplingThrottleMediator.setId(idAttribute.getAttributeValue());
+        }
+
+        OMAttribute rateAttribute = omElement.getAttribute(RATE_ATTR);
+        if (rateAttribute != null) {
+            try {
+                samplingThrottleMediator.setSamplingRate(
+                        Integer.parseInt(rateAttribute.getAttributeValue()));
+            } catch (NumberFormatException nfe) {
+                handleException("Sampling rate has to be an integer value, but found : "
+                        + rateAttribute.getAttributeValue());
+            }
+        }
+
+        OMAttribute unitTimeAttribute = omElement.getAttribute(UNIT_TIME_ATTR);
+        if (unitTimeAttribute != null) {
+            try {
+                samplingThrottleMediator.setUnitTime(
+                        Long.parseLong(unitTimeAttribute.getAttributeValue()));
+            } catch (NumberFormatException nfe) {
+                handleException("Sampling unitTime has to be a long value in milliseconds, " +
+                        "but found : " + rateAttribute.getAttributeValue());
+            }
+        }
+
+        OMElement targetElem = omElement.getFirstChildWithName(TARGET_Q);
+        if (targetElem != null) {
+            Target target = TargetFactory.createTarget(targetElem);
+            samplingThrottleMediator.setTarget(target);
+        } else {
+            handleException("Sampler requires a target for the sampling mediation");
+        }
+
+        OMElement messageQueueElem = omElement.getFirstChildWithName(MESSAGE_QUEUE_Q);
+        if (messageQueueElem != null && messageQueueElem.getAttribute(CLASS_ATTR) != null) {
+            String className = messageQueueElem.getAttributeValue(CLASS_ATTR);
+            try {
+                Class messageQueueImplClass = Class.forName(className);
+                Object obj = messageQueueImplClass.newInstance();
+                if (obj instanceof MessageQueue) {
+                    samplingThrottleMediator.setMessageQueue((MessageQueue) obj);
+                } else {
+                    handleException("Provided message queue class : " + className
+                            + " doesn't implement the org.apache.synapse.mediators." +
+                            "eip.sample.MessageQueue interface");
+                }
+            } catch (ClassNotFoundException e) {
+                handleException("Couldn't find the class specified for the message queue " +
+                        "implementation : " + className);
+            } catch (InstantiationException e) {
+                handleException("Couldn't instantiate the message queue : " + className);
+            } catch (IllegalAccessException e) {
+                handleException("Couldn't instantiate the message queue : " + className);
+            }
+        }
+
+        return samplingThrottleMediator;
+    }
+
+    public QName getTagQName() {
+        return SAMPLER_Q;
+    }
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java Wed May 12 17:24:47 2010
@@ -0,0 +1,75 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator;
+
+/**
+ * Serializes the {@link org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator} instance
+ * into a XML configuration as follows</p>
+ *
+ * <pre>&lt;sampler id="string" rate="int"/&gt;</pre>
+ *
+ * @see org.apache.synapse.config.xml.AbstractMediatorSerializer
+ */
+public class SamplingThrottleMediatorSerializer extends AbstractMediatorSerializer {
+
+    public OMElement serializeMediator(OMElement omElement, Mediator mediator) {
+        OMElement samplerElem = fac.createOMElement("sampler", synNS);
+        saveTracingState(samplerElem, mediator);
+
+        SamplingThrottleMediator samplingThrottleMediator = (SamplingThrottleMediator) mediator;
+
+        if (samplingThrottleMediator.getId() != null) {
+            samplerElem.addAttribute("id", samplingThrottleMediator.getId(), nullNS);
+        }
+        samplerElem.addAttribute("rate",
+                Integer.toString(samplingThrottleMediator.getSamplingRate()), nullNS);
+        samplerElem.addAttribute("unitTime",
+                Long.toString(samplingThrottleMediator.getUnitTime()), nullNS);
+
+        if (samplingThrottleMediator.isMessageQueueExplicitlySet()) {
+            OMElement messageQueueElem = fac.createOMElement("messageQueue", synNS);
+            messageQueueElem.addAttribute("class",
+                    samplingThrottleMediator.getMessageQueue().getClass().getName(), nullNS);
+            samplerElem.addChild(messageQueueElem);
+        }
+
+        if (samplingThrottleMediator.getTarget() != null) {
+            samplerElem.addChild(
+                    TargetSerializer.serializeTarget(samplingThrottleMediator.getTarget()));
+        } else {
+            handleException("Couldn't find the target for the sampler. " +
+                    "Target is mandatory for a sampler");
+        }
+
+        if (omElement != null) {
+            omElement.addChild(samplerElem);
+        }
+
+        return samplerElem;
+    }
+
+    public String getMediatorClassName() {
+        return SamplingThrottleMediator.class.getName();
+    }
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java Wed May 12 17:24:47 2010
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.mediators.eip.sample;
+
+import org.apache.synapse.MessageContext;
+
+/**
+ *
+ */
+public interface MessageQueue {
+
+    void add(MessageContext synCtx);
+
+    MessageContext get();
+
+    boolean isEmpty();
+
+    boolean isPersistent();
+
+    boolean persist();
+
+    void load();
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java Wed May 12 17:24:47 2010
@@ -0,0 +1,198 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.mediators.eip.sample;
+
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseLog;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.eip.Target;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * This implements the well known <code>Sample</code> EIP (Enterprise Integration Pattern), which controls the flow
+ * of messages and limit the rate at which the messages are flowing through the sampler</p>
+ *
+ * <p>Please note that the usage of this will require the sampler to be on the out-flow as well to correctly
+ * determine & to manage the rate.</p>
+ *
+ * @see org.apache.synapse.mediators.AbstractMediator
+ */
+public class SamplingThrottleMediator extends AbstractMediator implements ManagedLifecycle {
+
+    /** Rate at which this mediator allows the flow in TPS */
+    private int samplingRate = 1;
+
+    /** Unit time in milliseconds applied to the <code>samplingRate</code> */
+    private long unitTime = 1000;
+
+    /** Identifier is used to co-relate the in and out path samplers */
+    private String id;
+
+    /** Target to be used for mediation from the sampler */
+    private Target target;
+
+    /**
+     * {@link org.apache.synapse.mediators.eip.sample.MessageQueue} implementation to be used,
+     * which defaults to {@link org.apache.synapse.mediators.eip.sample.UnboundedMessageQueue}
+     */
+    private MessageQueue messageQueue = new UnboundedMessageQueue();
+
+    private boolean messageQueueExplicitlySet;
+
+    private TimerTask messageProcessor;
+
+    public void init(SynapseEnvironment synapseEnvironment) {
+
+        if (messageQueue.isPersistent()) {
+            log.info("Loading the persisted messages if there are any to the message queue");
+            messageQueue.load();
+        }
+
+        Timer samplingTimer = synapseEnvironment.getSynapseConfiguration().getSynapseTimer();
+        messageProcessor = new MessageProcessor();
+        log.info("Scheduling the sampling timer to invoke the message processor " +
+                "at an interval of : " + unitTime);
+        samplingTimer.schedule(messageProcessor, 0, unitTime);
+    }
+
+    public void destroy() {
+        messageProcessor.cancel();
+        if (!messageQueue.isEmpty()) {
+            log.warn("There are messages on the sampling message queue, " +
+                    "but the message processor has been destroyed.");
+            if (messageQueue.isPersistent()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Persisting the messages on the message queue");
+                }
+                if (messageQueue.persist()) {
+                    log.info("Completed persisting the messages on the message queue");
+                } else {
+                    log.error("Couldn't persist the messages on the message queue");
+                }
+            } else {
+                log.warn("You are not using a persistent message queue, " +
+                        "you will be loosing messages which are on the queue");
+            }
+        }
+    }
+
+    public boolean mediate(MessageContext messageContext) {
+
+        SynapseLog synLog = getLog(messageContext);
+
+        synLog.traceOrDebug("Start : Sampler mediator");
+        if (synLog.isTraceTraceEnabled()) {
+            synLog.traceTrace("Message : " + messageContext.getEnvelope());
+        }
+
+        if (!messageContext.isResponse()) {
+            if (synLog.isTraceOrDebugEnabled()) {
+                synLog.traceOrDebug("Adding the message with message id : "
+                        + messageContext.getMessageID() + " into the message queue for sampling");
+            }
+            messageQueue.add(messageContext);
+        } else {
+            synLog.auditWarn("Encountered a response message which will not be sampled");
+        }
+
+        OperationContext opCtx
+            = ((Axis2MessageContext) messageContext).getAxis2MessageContext().getOperationContext();
+        if (opCtx != null) {
+            opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
+        }
+
+        synLog.traceOrDebug("End : Sampler mediator");
+        return false;
+    }
+
+    public int getSamplingRate() {
+        return samplingRate;
+    }
+
+    public void setSamplingRate(int samplingRate) {
+        this.samplingRate = samplingRate;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public Target getTarget() {
+        return target;
+    }
+
+    public void setTarget(Target target) {
+        this.target = target;
+    }
+
+    public long getUnitTime() {
+        return unitTime;
+    }
+
+    public void setUnitTime(long unitTime) {
+        this.unitTime = unitTime;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+        this.messageQueueExplicitlySet = true;
+    }
+
+    public boolean isMessageQueueExplicitlySet() {
+        return messageQueueExplicitlySet;
+    }
+
+    private class MessageProcessor extends TimerTask {
+
+        @Override
+        public void run() {
+            if (log.isDebugEnabled()) {
+                log.debug("Started running the message processor");
+            }
+            for (int i = 0; i < samplingRate && !messageQueue.isEmpty(); i++) {
+                MessageContext synCtx = messageQueue.get();
+                if (log.isDebugEnabled()) {
+                    log.debug("Mediating the message on the message queue with message id : "
+                            + synCtx.getMessageID());
+                }
+                target.mediate(synCtx);
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Message processing completed for the given sampling rate");
+            }
+        }
+    }
+
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java Wed May 12 17:24:47 2010
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.mediators.eip.sample;
+
+import org.apache.synapse.MessageContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class UnboundedMessageQueue implements MessageQueue {
+
+    private List<MessageContext> messageQueue = new ArrayList<MessageContext>();
+
+    public void add(MessageContext synCtx) {
+        messageQueue.add(synCtx);
+    }
+
+    public MessageContext get() {
+        if (!messageQueue.isEmpty()) {
+            return messageQueue.remove(0);
+        } else {
+            return null;
+        }
+    }
+
+    public boolean isEmpty() {
+        return messageQueue.isEmpty();
+    }
+
+    public boolean isPersistent() {
+        return false;
+    }
+
+    public boolean persist() {
+        return false;
+    }
+
+    public void load() {
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+}