You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2011/02/15 19:44:35 UTC

svn commit: r1071010 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message: processors/sampler/SamplingJob.java processors/sampler/SamplingProcessor.java store/InMemoryMessageStore.java

Author: supun
Date: Tue Feb 15 18:44:35 2011
New Revision: 1071010

URL: http://svn.apache.org/viewvc?rev=1071010&view=rev
Log:
improving the sampler as well as improving the in memory message store to use synchronization

Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1071010&r1=1071009&r2=1071010&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java Tue Feb 15 18:44:35 2011
@@ -31,6 +31,7 @@ import org.quartz.JobExecutionException;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
 
 public class SamplingJob implements Job {
     private static Log log = LogFactory.getLog(SamplingJob.class);
@@ -42,6 +43,7 @@ public class SamplingJob implements Job 
         final MessageStore messageStore = (MessageStore) jdm.get(SamplingProcessor.MESSAGE_STORE);
         final ExecutorService executor = (ExecutorService) jdm.get(SamplingProcessor.EXECUTOR);
         final String sequence = (String) jdm.get(SamplingProcessor.SEQUENCE);
+        final Lock lock = (Lock) jdm.get(SamplingProcessor.LOCK);
 
         int conc = 1;
         if (concurrency instanceof Integer) {
@@ -49,24 +51,27 @@ public class SamplingJob implements Job 
         }
 
         for (int i = 0; i < conc; i++) {
-            List<MessageContext> list = messageStore.getMessages(0, 0);
-
-            if (list != null && list.size() == 1) {
-                final MessageContext messageContext = list.get(0);
-                if (messageContext != null) {
-                    messageStore.unstore(0, 0);
-                    executor.submit(new Runnable() {
-                        public void run() {
-                            try {
-                                Mediator processingSequence = messageContext.getSequence(sequence);
-                                if (processingSequence != null) {
-                                    processingSequence.mediate(messageContext);
+            //lock.lock();
+            synchronized (messageStore){
+                List<MessageContext> list = messageStore.getMessages(0, 0);
+
+                if (list != null && list.size() == 1) {
+                    final MessageContext messageContext = list.get(0);
+                    if (messageContext != null) {
+                        messageStore.unstore(0, 0);
+                        executor.submit(new Runnable() {
+                            public void run() {
+                                try {
+                                    Mediator processingSequence = messageContext.getSequence(sequence);
+                                    if (processingSequence != null) {
+                                        processingSequence.mediate(messageContext);
+                                    }
+                                } catch (Throwable t) {
+                                    log.error("Error occurred while executing the message", t);
                                 }
-                            } catch (Throwable t) {
-                                log.error("Error occurred while executing the message", t);
                             }
-                        }
-                    });
+                        });
+                    }
                 }
             }
         }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1071010&r1=1071009&r2=1071010&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java Tue Feb 15 18:44:35 2011
@@ -31,10 +31,13 @@ import org.quartz.impl.StdSchedulerFacto
 import java.text.ParseException;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class SamplingProcessor implements MessageProcessor {
     private Log log = LogFactory.getLog(SamplingProcessor.class);
 
+    public static final String LOCK = "lock";
     public static final String EXECUTOR = "Executor";
     public static final String MESSAGE_STORE = "MESSAGE_STORE";
     public static final String QUARTZ_CONF = "quartz.conf";
@@ -80,6 +83,8 @@ public class SamplingProcessor implement
     /** A sequence to run when the sampler is executed */
     private String sequence = null;
 
+    private Lock lock = new ReentrantLock();
+
     /**
      * Creates a Quartz Scheduler and schedule the message processing logic.
      */
@@ -108,6 +113,7 @@ public class SamplingProcessor implement
         jobDataMap.put(EXECUTOR, executor);
         jobDataMap.put(MESSAGE_STORE, messageStore);
         jobDataMap.put(SEQUENCE, sequence);
+        jobDataMap.put(LOCK, lock);
 
         jobDetail.setJobDataMap(jobDataMap);
 

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1071010&r1=1071009&r2=1071010&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java Tue Feb 15 18:44:35 2011
@@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.MessageContext;
 
 import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * InMemory Message store will store Failed Messages in the local memory
@@ -35,98 +37,149 @@ public class InMemoryMessageStore extend
     /** The map that keeps the stored messages */
     private Map<String, MessageContext> messageList = new HashMap<String, MessageContext>();
 
+    private Lock lock = new ReentrantLock();
+
     public void store(MessageContext messageContext) {
-        if (messageContext != null) {
-            mediateSequence(messageContext);
-            messageList.put(messageContext.getMessageID(), messageContext);
-
-            if (log.isDebugEnabled()) {
-                log.debug("Message " + messageContext.getMessageID() +
-                        " has been stored");
+        lock.lock();
+        try {
+            if (messageContext != null) {
+                mediateSequence(messageContext);
+                messageList.put(messageContext.getMessageID(), messageContext);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Message " + messageContext.getMessageID() +
+                            " has been stored");
+                }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
     public MessageContext unstore(String messageID) {
-        if (messageID != null) {
-            return messageList.remove(messageID);
+        lock.lock();
+        try {
+            if (messageID != null) {
+                return messageList.remove(messageID);
+            }
+        } finally {
+            lock.unlock();
         }
         return null;
     }
 
     public List<MessageContext> unstoreAll() {
-        List<MessageContext> returnList = new ArrayList<MessageContext>();
-        for (String k : messageList.keySet()) {
-            returnList.add(messageList.remove(k));
+        lock.lock();
+        try {
+            List<MessageContext> returnList = new ArrayList<MessageContext>();
+            for (String k : messageList.keySet()) {
+                returnList.add(messageList.remove(k));
+            }
+            return returnList;
+        } finally {
+            lock.unlock();
         }
-        return returnList;
     }
 
     public List<MessageContext> unstore(int maxNumberOfMessages) {
-        List<MessageContext> returnList = new ArrayList<MessageContext>();
-        Iterator<String> it = messageList.keySet().iterator();
-        while (it.hasNext() && maxNumberOfMessages > 0) {
-            returnList.add(messageList.get(it.next()));
-            maxNumberOfMessages--;
-        }
+        lock.lock();
+        try {
+            List<MessageContext> returnList = new ArrayList<MessageContext>();
+            Iterator<String> it = messageList.keySet().iterator();
+            while (it.hasNext() && maxNumberOfMessages > 0) {
+                returnList.add(messageList.get(it.next()));
+                maxNumberOfMessages--;
+            }
 
-        return returnList;
+            return returnList;
+        } finally {
+            lock.unlock();
+        }
     }
 
     public List<MessageContext> unstore(int from, int to) {
-        List<MessageContext> returnlist = new ArrayList<MessageContext>();
-        if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
-
-            String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
-
-            for (int i = from; i <= to; i++) {
-                returnlist.add(messageList.remove(keys[i]));
+        lock.lock();
+        try {
+            List<MessageContext> returnlist = new ArrayList<MessageContext>();
+            if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
+
+                String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
+
+                for (int i = from; i <= to; i++) {
+                    returnlist.add(messageList.remove(keys[i]));
+                }
             }
+            return returnlist;
+        } finally {
+            lock.unlock();
         }
-        return returnlist;
     }
 
     public List<MessageContext> getMessages(int from, int to) {
-        List<MessageContext> returnList = new ArrayList<MessageContext>();
-        if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
-            String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
-
-            for (int i = from; i <= to; i++) {
-                returnList.add(messageList.get(keys[i]));
+        lock.lock();
+        try {
+            List<MessageContext> returnList = new ArrayList<MessageContext>();
+            if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
+                String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
+
+                for (int i = from; i <= to; i++) {
+                    returnList.add(messageList.get(keys[i]));
+                }
             }
+            return returnList;
+        } finally {
+            lock.unlock();
         }
-        return returnList;
     }
 
     public List<MessageContext> getAllMessages() {
-        List<MessageContext> returnList = new ArrayList<MessageContext>();
-        for (Map.Entry<String ,MessageContext> entry :messageList.entrySet()) {
-            returnList.add(entry.getValue());
+        lock.lock();
+        try {
+            List<MessageContext> returnList = new ArrayList<MessageContext>();
+            for (Map.Entry<String, MessageContext> entry : messageList.entrySet()) {
+                returnList.add(entry.getValue());
+            }
+            return returnList;
+        } finally {
+            lock.unlock();
         }
-        return returnList;
     }
 
     public MessageContext getMessage(String messageId) {
-        if (messageId != null) {
-            return messageList.get(messageId);
+        lock.lock();
+        try {
+            if (messageId != null) {
+                return messageList.get(messageId);
+            }
+        } finally {
+            lock.unlock();
         }
-
         return null;
     }
 
     public List<MessageContext> getMessages(int maxNumberOfMessages) {
-        List<MessageContext> returnList = new ArrayList<MessageContext>();
+        lock.lock();
+        try {
+            List<MessageContext> returnList = new ArrayList<MessageContext>();
+
+            Iterator<String> it = messageList.keySet().iterator();
+            while (it.hasNext() && maxNumberOfMessages > 0) {
+                returnList.add(messageList.get(it.next()));
+                maxNumberOfMessages--;
+            }
 
-        Iterator<String> it = messageList.keySet().iterator();
-        while (it.hasNext() && maxNumberOfMessages > 0) {
-            returnList.add(messageList.get(it.next()));
-            maxNumberOfMessages--;
+            return returnList;
+        } finally {
+            lock.unlock();
         }
-
-        return returnList;
     }
 
     public int getSize() {
-        return messageList.size();
+        lock.lock();
+        try {
+            return messageList.size();
+        } finally {
+            lock.unlock();
+        }
     }
 }
\ No newline at end of file