You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/02/22 09:56:49 UTC

svn commit: r1073259 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message: processors/ processors/dlc/ store/

Author: charith
Date: Tue Feb 22 08:56:49 2011
New Revision: 1073259

URL: http://svn.apache.org/viewvc?rev=1073259&view=rev
Log:
adding Message store observer

Added:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java
Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java Tue Feb 22 08:56:49 2011
@@ -21,6 +21,7 @@ package org.apache.synapse.message.proce
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.quartz.*;
 import org.quartz.impl.StdSchedulerFactory;
@@ -52,11 +53,6 @@ public abstract class ScheduledMessagePr
     }
 
     /**
-     * message store parameters
-     */
-    protected Map<String, Object> parameters = null;
-
-    /**
      * The quartz configuration file if specified as a parameter
      */
     protected String quartzConfig = null;
@@ -90,7 +86,8 @@ public abstract class ScheduledMessagePr
 
         JobDetail jobDetail = getJobDetail();
         JobDataMap jobDataMap = new JobDataMap();
-        jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE, messageStore);
+        jobDataMap.put(MessageProcessorConsents.MESSAGE_STORE,
+                configuration.getMessageStore(messageStore));
         jobDataMap.put(MessageProcessorConsents.PARAMETERS, parameters);
         jobDetail.setJobDataMap(jobDataMap);
 
@@ -169,6 +166,8 @@ public abstract class ScheduledMessagePr
             scheduler.start();
 
             state = State.INITIALIZED;
+
+            this.start();
         } catch (SchedulerException e) {
             throw new SynapseException("Error starting the scheduler", e);
         }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java Tue Feb 22 08:56:49 2011
@@ -24,6 +24,7 @@ import org.apache.synapse.FaultHandler;
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseArtifact;
+import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.mediators.MediatorFaultHandler;
 import org.apache.synapse.message.processors.MessageProcessorConsents;
@@ -51,7 +52,6 @@ public class RedeliveryJob implements Jo
 
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
         JobDataMap jdm = jobExecutionContext.getMergedJobDataMap();
-
         messageStore = (MessageStore) jdm.get(MessageProcessorConsents.MESSAGE_STORE);
         lock = ((AbstractMessageStore) messageStore).getLock();
         Map<String, Object> parameters = (Map<String, Object>) jdm.get(

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/AbstractMessageStore.java Tue Feb 22 08:56:49 2011
@@ -23,6 +23,8 @@ import org.apache.synapse.commons.jmx.MB
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.core.SynapseEnvironment;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -69,6 +71,11 @@ public abstract class AbstractMessageSto
      */
     protected String fileName;
 
+    /**
+     * List that holds the MessageStore observers registered with the Message Store
+     */
+    protected List<MessageStoreObserver> messageStoreObservers =
+            new ArrayList<MessageStoreObserver>();
 
     protected Lock lock = new ReentrantLock();
 
@@ -91,6 +98,37 @@ public abstract class AbstractMessageSto
     }
 
 
+    public void registerObserver(MessageStoreObserver observer) {
+        if(observer != null && !messageStoreObservers.contains(observer)) {
+            messageStoreObservers.add(observer);
+        }
+    }
+
+    public void unregisterObserver(MessageStoreObserver observer) {
+        if(observer != null && messageStoreObservers.contains(observer)) {
+            messageStoreObservers.remove(observer);
+        }
+    }
+
+    /**
+     * Notify Message Addition to the observers
+     * @param messageId of the Message added.
+     */
+    protected void notifyMessageAddition(String messageId) {
+        for(MessageStoreObserver o : messageStoreObservers) {
+            o.messageAdded(messageId);
+        }
+    }
+
+    /**
+     * Notify Message removal to the observers
+     * @param messageId of the Message added
+     */
+    protected void notifyMessageRemoval(String messageId) {
+        for(MessageStoreObserver o : messageStoreObservers) {
+            o.messageRemoved(messageId);
+        }
+    }
     public int size() {
         return -1;
     }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java Tue Feb 22 08:56:49 2011
@@ -45,7 +45,8 @@ public class InMemoryMessageStore extend
         try {
             if (messageContext != null) {
                 messageList.put(messageContext.getMessageID(), messageContext);
-
+                /** Notify observers */
+                notifyMessageAddition(messageContext.getMessageID());
                 if (log.isDebugEnabled()) {
                     log.debug("Message with id " + messageContext.getMessageID() +
                             " stored");
@@ -65,6 +66,8 @@ public class InMemoryMessageStore extend
             context = peek();
             if(context !=null) {
                 messageList.remove(context.getMessageID());
+                /** Notify observers */
+                notifyMessageRemoval(context.getMessageID());
             }
         } finally {
             lock.unlock();
@@ -101,7 +104,11 @@ public class InMemoryMessageStore extend
         lock.lock();
         try {
             if (messageID != null) {
-                return messageList.remove(messageID);
+               if(messageList.remove(messageID) != null) {
+                   /** Notify observers */
+                    notifyMessageRemoval(messageID);
+               }
+
             }
         } finally {
             lock.unlock();
@@ -115,6 +122,8 @@ public class InMemoryMessageStore extend
 
             for (String k : messageList.keySet()) {
                 messageList.remove(k);
+                /** Notify observers */
+                notifyMessageRemoval(k);
             }
         } finally {
             lock.unlock();

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java?rev=1073259&r1=1073258&r2=1073259&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStore.java Tue Feb 22 08:56:49 2011
@@ -142,6 +142,16 @@ public interface MessageStore extends Sy
 
 
     /**
-     * Todo Add observer api
+     * Register a MessageStore observer instance with the MessageStore
+     * to receive events.
+     * @param observer instance to be registered
      */
+    public void registerObserver(MessageStoreObserver observer);
+
+    /**
+     * Un register an Message store instance from the message store
+     * to stop receiving events
+     * @param observer  instance to be unregistered
+     */
+    public void unregisterObserver(MessageStoreObserver observer);
 }
\ No newline at end of file

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java?rev=1073259&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/MessageStoreObserver.java Tue Feb 22 08:56:49 2011
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.message.store;
+
+
+/**
+ * An implementation of this interface can be registered with a Message store instance to receive
+ * Message store update update events. When ever a message is added/removed events defined in this
+ * interface will be fired
+ */
+public interface MessageStoreObserver {
+
+    /**
+     * Method invoked when a message is added to the store
+     * @param messageId of the message that was added to the Message store
+     */
+    public void messageAdded(String messageId);
+
+    /**
+     * Method invoked when a message is removed from the store
+     * @param messageId of the Message that was removed
+     */
+    public void messageRemoved(String messageId);
+
+
+}