You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/02/18 07:56:44 UTC
svn commit: r1071899 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors:
AbstractMessageProcessor.java dlc/DeadLetterChannelView.java
dlc/RedeliveryJob.java dlc/RedeliveryProcessor.java
Author: charith
Date: Fri Feb 18 06:56:44 2011
New Revision: 1071899
URL: http://svn.apache.org/viewvc?rev=1071899&view=rev
Log:
fixing bugs in dlc redelivery processor
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java Fri Feb 18 06:56:44 2011
@@ -46,8 +46,8 @@ public abstract class AbstractMessagePro
/** Message Store associated with Message processor */
protected MessageStore messageStore;
- /** The interval at which this processor runs */
- protected long interval = 1;
+ /** The interval at which this processor runs , default value is 1000ms*/
+ protected long interval = 1000;
@@ -119,7 +119,11 @@ public abstract class AbstractMessagePro
}
public void setMessageStore(MessageStore messageStore) {
- this.messageStore = messageStore;
+ if (messageStore != null) {
+ this.messageStore = messageStore;
+ } else {
+ throw new SynapseException("Error Can't set Message store to null");
+ }
}
public void setParameters(Map<String, Object> parameters) {
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java Fri Feb 18 06:56:44 2011
@@ -23,12 +23,13 @@ import org.apache.synapse.MessageContext
import org.apache.synapse.SynapseArtifact;
import org.apache.synapse.SynapseException;
import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.store.AbstractMessageStore;
import org.apache.synapse.message.store.MessageStore;
import java.util.ArrayList;
import java.util.List;
-public class DeadLetterChannelView implements DeadLetterChannelViewMBean{
+public class DeadLetterChannelView implements DeadLetterChannelViewMBean {
private MessageStore messageStore;
@@ -41,29 +42,37 @@ public class DeadLetterChannelView imple
}
public void resendAll() {
- int size = messageStore.getSize();
- for(int i = 0; i < size ; i++) {
- MessageContext messageContext = messageStore.unstore(0,0).get(0);
- if(messageContext != null) {
- redeliver(messageContext);
- }
+ if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+ int size = messageStore.getSize();
+ for (int i = 0; i < size; i++) {
+ MessageContext messageContext = messageStore.unstore(0, 0).get(0);
+ if (messageContext != null) {
+ redeliver(messageContext);
+ }
+ }
+ } else {
+ throw new SynapseException("Error Message store being used re try later");
}
}
public void deleteAll() {
- int size = messageStore.getSize();
- for(int i = 0; i < size ; i++) {
- messageStore.unstore(0,0);
+ if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+ int size = messageStore.getSize();
+ for (int i = 0; i < size; i++) {
+ messageStore.unstore(0, 0);
+ }
+ } else {
+ throw new SynapseException("Error Message store being used re try later");
}
}
public List<String> getMessageIds() {
- int size = messageStore.getSize();
- List<String> list = new ArrayList<String>();
- for(int i = 0; i < size ; i++) {
- MessageContext messageContext = messageStore.getMessages(0,0).get(0);
- if(messageContext != null) {
+ int size = messageStore.getSize();
+ List<String> list = new ArrayList<String>();
+ for (int i = 0; i < size; i++) {
+ MessageContext messageContext = messageStore.getMessages(0, 0).get(0);
+ if (messageContext != null) {
list.add(messageContext.getMessageID());
}
}
@@ -71,20 +80,26 @@ public class DeadLetterChannelView imple
}
public void resend(String messageID) {
- MessageContext messageContext = messageStore.getMessage(messageID);
- if(messageContext != null) {
- redeliver(messageContext);
- }
+ if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+ MessageContext messageContext = messageStore.unstore(messageID);
+ if (messageContext != null) {
+ redeliver(messageContext);
+ }
+ } else {
+ throw new SynapseException("Error Message store being used re try later");
+ }
}
public void delete(String messageID) {
- messageStore.unstore(messageID);
+ if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+ messageStore.unstore(messageID);
+ }
}
public String getEnvelope(String messageID) {
MessageContext messageContext = messageStore.getMessage(messageID);
- if(messageContext != null) {
- return messageContext.getEnvelope().toString();
+ if (messageContext != null) {
+ return messageContext.getEnvelope().toString();
}
return null;
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java Fri Feb 18 06:56:44 2011
@@ -56,8 +56,17 @@ public class RedeliveryJob implements Jo
lock = ((AbstractMessageStore) messageStore).getLock();
Map<String, Object> parameters = (Map<String, Object>) jdm.get(
MessageProcessorConsents.PARAMETERS);
- maxNumberOfRedelivers = Integer.parseInt((String) parameters.get(DLCConstents.
- MAX_REDELIVERY_COUNT));
+ Object o = parameters.get(DLCConstents.MAX_REDELIVERY_COUNT);
+ if(o == null) {
+ maxNumberOfRedelivers = 0;
+ } else {
+ maxNumberOfRedelivers = Integer.parseInt((String)o);
+ }
+
+ /** We are not going to access message Store if max number of redelivery is less than 0*/
+ if(maxNumberOfRedelivers <= 0) {
+ return;
+ }
/**
* We will keep the message store lock till the redelivery over
@@ -130,13 +139,17 @@ public class RedeliveryJob implements Jo
static boolean handleEndpointReplay(Endpoint endpoint, MessageContext messageContext) {
setFaultHandler(messageContext);
if (endpoint != null && messageContext != null) {
+ if (endpoint.readyToSend()) {
+ endpoint.send(messageContext);
+ return true;
+ } else {
+ return false;
+ }
+
+ } else {
return false;
- } else if (endpoint.readyToSend()) {
- endpoint.send(messageContext);
- return true;
}
- return false;
}
/**
@@ -146,8 +159,11 @@ public class RedeliveryJob implements Jo
* @return true of success
*/
static boolean handleSequenceReplay(Mediator mediator, MessageContext messageContext) {
- setFaultHandler(messageContext);
- mediator.mediate(messageContext);
+ if (mediator != null && messageContext != null) {
+ setFaultHandler(messageContext);
+ mediator.mediate(messageContext);
+ return true;
+ }
return true;
}
@@ -174,10 +190,12 @@ public class RedeliveryJob implements Jo
String replayFaultHandler = (String)messageContext.getProperty(
DLCConstents.REPLAY_FAULT_HANDLER);
if(replayFaultHandler != null) {
- if(messageContext.getEndpoint(replayFaultHandler) != null ) {
+ if(messageContext.getConfiguration().getDefinedEndpoints().
+ containsKey(replayFaultHandler)) {
Endpoint ep = messageContext.getEndpoint(replayFaultHandler);
messageContext.pushFaultHandler((FaultHandler)ep);
- } else if (messageContext.getSequence(replayFaultHandler) != null) {
+ } else if (messageContext.getConfiguration().getDefinedSequences().
+ containsKey(replayFaultHandler)) {
Mediator mediator = messageContext.getSequence(replayFaultHandler);
MediatorFaultHandler faultHandler = new MediatorFaultHandler(mediator);
messageContext.pushFaultHandler(faultHandler);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java Fri Feb 18 06:56:44 2011
@@ -18,29 +18,36 @@
*/
package org.apache.synapse.message.processors.dlc;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.*;
-import org.apache.synapse.core.SynapseEnvironment;
-import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.message.processors.AbstractMessageProcessor;
-import org.apache.synapse.message.processors.MessageProcessor;
import org.apache.synapse.message.store.MessageStore;
-import org.apache.synapse.securevault.commons.MBeanRegistrar;
import org.quartz.JobDetail;
-import java.util.Map;
-
/**
* Redelivery processor is the Message processor which implements the Dead letter channel EIP
* It will Time to time Redeliver the Messages to a given target.
*/
public class RedeliveryProcessor extends AbstractMessageProcessor{
- @Override
+
+ /**Dead Letter channel JMX API*/
+ private DeadLetterChannelView dlcView;
+
+ @Override
+ public void setMessageStore(MessageStore messageStore) {
+ super.setMessageStore(messageStore);
+ dlcView = new DeadLetterChannelView(messageStore);
+ org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(dlcView,
+ "Dead Letter Channel", messageStore.getName());
+ }
+
+ @Override
protected JobDetail getJobDetail() {
JobDetail jobDetail = new JobDetail();
jobDetail.setName(messageStore.getName() + "- redelivery job");
jobDetail.setJobClass(RedeliveryJob.class);
return jobDetail;
}
+
+ public DeadLetterChannelView getDlcView() {
+ return dlcView;
+ }
}