You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2011/02/15 19:44:35 UTC
svn commit: r1071010 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message:
processors/sampler/SamplingJob.java
processors/sampler/SamplingProcessor.java store/InMemoryMessageStore.java
Author: supun
Date: Tue Feb 15 18:44:35 2011
New Revision: 1071010
URL: http://svn.apache.org/viewvc?rev=1071010&view=rev
Log:
improving the sampler as well as improving the in memory message store to use synchronization
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1071010&r1=1071009&r2=1071010&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java Tue Feb 15 18:44:35 2011
@@ -31,6 +31,7 @@ import org.quartz.JobExecutionException;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
public class SamplingJob implements Job {
private static Log log = LogFactory.getLog(SamplingJob.class);
@@ -42,6 +43,7 @@ public class SamplingJob implements Job
final MessageStore messageStore = (MessageStore) jdm.get(SamplingProcessor.MESSAGE_STORE);
final ExecutorService executor = (ExecutorService) jdm.get(SamplingProcessor.EXECUTOR);
final String sequence = (String) jdm.get(SamplingProcessor.SEQUENCE);
+ final Lock lock = (Lock) jdm.get(SamplingProcessor.LOCK);
int conc = 1;
if (concurrency instanceof Integer) {
@@ -49,24 +51,27 @@ public class SamplingJob implements Job
}
for (int i = 0; i < conc; i++) {
- List<MessageContext> list = messageStore.getMessages(0, 0);
-
- if (list != null && list.size() == 1) {
- final MessageContext messageContext = list.get(0);
- if (messageContext != null) {
- messageStore.unstore(0, 0);
- executor.submit(new Runnable() {
- public void run() {
- try {
- Mediator processingSequence = messageContext.getSequence(sequence);
- if (processingSequence != null) {
- processingSequence.mediate(messageContext);
+ //lock.lock();
+ synchronized (messageStore){
+ List<MessageContext> list = messageStore.getMessages(0, 0);
+
+ if (list != null && list.size() == 1) {
+ final MessageContext messageContext = list.get(0);
+ if (messageContext != null) {
+ messageStore.unstore(0, 0);
+ executor.submit(new Runnable() {
+ public void run() {
+ try {
+ Mediator processingSequence = messageContext.getSequence(sequence);
+ if (processingSequence != null) {
+ processingSequence.mediate(messageContext);
+ }
+ } catch (Throwable t) {
+ log.error("Error occurred while executing the message", t);
}
- } catch (Throwable t) {
- log.error("Error occurred while executing the message", t);
}
- }
- });
+ });
+ }
}
}
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1071010&r1=1071009&r2=1071010&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java Tue Feb 15 18:44:35 2011
@@ -31,10 +31,13 @@ import org.quartz.impl.StdSchedulerFacto
import java.text.ParseException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
public class SamplingProcessor implements MessageProcessor {
private Log log = LogFactory.getLog(SamplingProcessor.class);
+ public static final String LOCK = "lock";
public static final String EXECUTOR = "Executor";
public static final String MESSAGE_STORE = "MESSAGE_STORE";
public static final String QUARTZ_CONF = "quartz.conf";
@@ -80,6 +83,8 @@ public class SamplingProcessor implement
/** A sequence to run when the sampler is executed */
private String sequence = null;
+ private Lock lock = new ReentrantLock();
+
/**
* Creates a Quartz Scheduler and schedule the message processing logic.
*/
@@ -108,6 +113,7 @@ public class SamplingProcessor implement
jobDataMap.put(EXECUTOR, executor);
jobDataMap.put(MESSAGE_STORE, messageStore);
jobDataMap.put(SEQUENCE, sequence);
+ jobDataMap.put(LOCK, lock);
jobDetail.setJobDataMap(jobDataMap);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java?rev=1071010&r1=1071009&r2=1071010&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/store/InMemoryMessageStore.java Tue Feb 15 18:44:35 2011
@@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFac
import org.apache.synapse.MessageContext;
import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* InMemory Message store will store Failed Messages in the local memory
@@ -35,98 +37,149 @@ public class InMemoryMessageStore extend
/** The map that keeps the stored messages */
private Map<String, MessageContext> messageList = new HashMap<String, MessageContext>();
+ private Lock lock = new ReentrantLock();
+
public void store(MessageContext messageContext) {
- if (messageContext != null) {
- mediateSequence(messageContext);
- messageList.put(messageContext.getMessageID(), messageContext);
-
- if (log.isDebugEnabled()) {
- log.debug("Message " + messageContext.getMessageID() +
- " has been stored");
+ lock.lock();
+ try {
+ if (messageContext != null) {
+ mediateSequence(messageContext);
+ messageList.put(messageContext.getMessageID(), messageContext);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Message " + messageContext.getMessageID() +
+ " has been stored");
+ }
}
+ } finally {
+ lock.unlock();
}
}
public MessageContext unstore(String messageID) {
- if (messageID != null) {
- return messageList.remove(messageID);
+ lock.lock();
+ try {
+ if (messageID != null) {
+ return messageList.remove(messageID);
+ }
+ } finally {
+ lock.unlock();
}
return null;
}
public List<MessageContext> unstoreAll() {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
- for (String k : messageList.keySet()) {
- returnList.add(messageList.remove(k));
+ lock.lock();
+ try {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+ for (String k : messageList.keySet()) {
+ returnList.add(messageList.remove(k));
+ }
+ return returnList;
+ } finally {
+ lock.unlock();
}
- return returnList;
}
public List<MessageContext> unstore(int maxNumberOfMessages) {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
- Iterator<String> it = messageList.keySet().iterator();
- while (it.hasNext() && maxNumberOfMessages > 0) {
- returnList.add(messageList.get(it.next()));
- maxNumberOfMessages--;
- }
+ lock.lock();
+ try {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+ Iterator<String> it = messageList.keySet().iterator();
+ while (it.hasNext() && maxNumberOfMessages > 0) {
+ returnList.add(messageList.get(it.next()));
+ maxNumberOfMessages--;
+ }
- return returnList;
+ return returnList;
+ } finally {
+ lock.unlock();
+ }
}
public List<MessageContext> unstore(int from, int to) {
- List<MessageContext> returnlist = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
-
- String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
-
- for (int i = from; i <= to; i++) {
- returnlist.add(messageList.remove(keys[i]));
+ lock.lock();
+ try {
+ List<MessageContext> returnlist = new ArrayList<MessageContext>();
+ if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
+
+ String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
+
+ for (int i = from; i <= to; i++) {
+ returnlist.add(messageList.remove(keys[i]));
+ }
}
+ return returnlist;
+ } finally {
+ lock.unlock();
}
- return returnlist;
}
public List<MessageContext> getMessages(int from, int to) {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
- if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
- String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
-
- for (int i = from; i <= to; i++) {
- returnList.add(messageList.get(keys[i]));
+ lock.lock();
+ try {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+ if (from <= to && (from <= messageList.size() && to <= messageList.size()) && messageList.size() > 0) {
+ String[] keys = messageList.keySet().toArray(new String[messageList.keySet().size()]);
+
+ for (int i = from; i <= to; i++) {
+ returnList.add(messageList.get(keys[i]));
+ }
}
+ return returnList;
+ } finally {
+ lock.unlock();
}
- return returnList;
}
public List<MessageContext> getAllMessages() {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
- for (Map.Entry<String ,MessageContext> entry :messageList.entrySet()) {
- returnList.add(entry.getValue());
+ lock.lock();
+ try {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+ for (Map.Entry<String, MessageContext> entry : messageList.entrySet()) {
+ returnList.add(entry.getValue());
+ }
+ return returnList;
+ } finally {
+ lock.unlock();
}
- return returnList;
}
public MessageContext getMessage(String messageId) {
- if (messageId != null) {
- return messageList.get(messageId);
+ lock.lock();
+ try {
+ if (messageId != null) {
+ return messageList.get(messageId);
+ }
+ } finally {
+ lock.unlock();
}
-
return null;
}
public List<MessageContext> getMessages(int maxNumberOfMessages) {
- List<MessageContext> returnList = new ArrayList<MessageContext>();
+ lock.lock();
+ try {
+ List<MessageContext> returnList = new ArrayList<MessageContext>();
+
+ Iterator<String> it = messageList.keySet().iterator();
+ while (it.hasNext() && maxNumberOfMessages > 0) {
+ returnList.add(messageList.get(it.next()));
+ maxNumberOfMessages--;
+ }
- Iterator<String> it = messageList.keySet().iterator();
- while (it.hasNext() && maxNumberOfMessages > 0) {
- returnList.add(messageList.get(it.next()));
- maxNumberOfMessages--;
+ return returnList;
+ } finally {
+ lock.unlock();
}
-
- return returnList;
}
public int getSize() {
- return messageList.size();
+ lock.lock();
+ try {
+ return messageList.size();
+ } finally {
+ lock.unlock();
+ }
}
}
\ No newline at end of file