You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/02/18 07:56:44 UTC

svn commit: r1071899 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors: AbstractMessageProcessor.java dlc/DeadLetterChannelView.java dlc/RedeliveryJob.java dlc/RedeliveryProcessor.java

Author: charith
Date: Fri Feb 18 06:56:44 2011
New Revision: 1071899

URL: http://svn.apache.org/viewvc?rev=1071899&view=rev
Log:
fixing bugs in dlc redelivery processor

Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/AbstractMessageProcessor.java Fri Feb 18 06:56:44 2011
@@ -46,8 +46,8 @@ public abstract class AbstractMessagePro
     /** Message Store associated with Message processor */
     protected MessageStore messageStore;
 
-    /** The interval at which this processor runs */
-    protected long interval = 1;
+    /** The interval at which this processor runs , default value is 1000ms*/
+    protected long interval = 1000;
 
 
 
@@ -119,7 +119,11 @@ public abstract class AbstractMessagePro
     }
 
     public void setMessageStore(MessageStore messageStore) {
-        this.messageStore = messageStore;
+        if (messageStore != null) {
+            this.messageStore = messageStore;
+        } else {
+            throw new SynapseException("Error Can't set Message store to null");
+        }
     }
 
     public void setParameters(Map<String, Object> parameters) {

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/DeadLetterChannelView.java Fri Feb 18 06:56:44 2011
@@ -23,12 +23,13 @@ import org.apache.synapse.MessageContext
 import org.apache.synapse.SynapseArtifact;
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.message.store.AbstractMessageStore;
 import org.apache.synapse.message.store.MessageStore;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeadLetterChannelView implements DeadLetterChannelViewMBean{
+public class DeadLetterChannelView implements DeadLetterChannelViewMBean {
 
     private MessageStore messageStore;
 
@@ -41,29 +42,37 @@ public class DeadLetterChannelView imple
     }
 
     public void resendAll() {
-        int size = messageStore.getSize();
-        for(int i = 0; i < size ; i++) {
-            MessageContext messageContext = messageStore.unstore(0,0).get(0);
-            if(messageContext != null) {
-                redeliver(messageContext);
-            }
+        if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+            int size = messageStore.getSize();
+            for (int i = 0; i < size; i++) {
+                MessageContext messageContext = messageStore.unstore(0, 0).get(0);
+                if (messageContext != null) {
+                    redeliver(messageContext);
+                }
 
+            }
+        } else {
+            throw new SynapseException("Error Message store being used re try later");
         }
     }
 
     public void deleteAll() {
-        int size = messageStore.getSize();
-        for(int i = 0; i < size ; i++) {
-            messageStore.unstore(0,0);
+        if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+            int size = messageStore.getSize();
+            for (int i = 0; i < size; i++) {
+                messageStore.unstore(0, 0);
+            }
+        } else {
+            throw new SynapseException("Error Message store being used re try later");
         }
     }
 
     public List<String> getMessageIds() {
-       int size = messageStore.getSize();
-       List<String> list = new ArrayList<String>();
-        for(int i = 0; i < size ; i++) {
-            MessageContext messageContext = messageStore.getMessages(0,0).get(0);
-            if(messageContext != null) {
+        int size = messageStore.getSize();
+        List<String> list = new ArrayList<String>();
+        for (int i = 0; i < size; i++) {
+            MessageContext messageContext = messageStore.getMessages(0, 0).get(0);
+            if (messageContext != null) {
                 list.add(messageContext.getMessageID());
             }
         }
@@ -71,20 +80,26 @@ public class DeadLetterChannelView imple
     }
 
     public void resend(String messageID) {
-       MessageContext messageContext = messageStore.getMessage(messageID);
-       if(messageContext != null) {
-            redeliver(messageContext);
-       }
+        if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+            MessageContext messageContext = messageStore.unstore(messageID);
+            if (messageContext != null) {
+                redeliver(messageContext);
+            }
+        } else {
+            throw new SynapseException("Error Message store being used re try later");
+        }
     }
 
     public void delete(String messageID) {
-        messageStore.unstore(messageID);
+        if (((AbstractMessageStore) messageStore).getLock().tryLock()) {
+            messageStore.unstore(messageID);
+        }
     }
 
     public String getEnvelope(String messageID) {
         MessageContext messageContext = messageStore.getMessage(messageID);
-        if(messageContext != null) {
-            return  messageContext.getEnvelope().toString();
+        if (messageContext != null) {
+            return messageContext.getEnvelope().toString();
         }
 
         return null;

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryJob.java Fri Feb 18 06:56:44 2011
@@ -56,8 +56,17 @@ public class RedeliveryJob implements Jo
         lock = ((AbstractMessageStore) messageStore).getLock();
         Map<String, Object> parameters = (Map<String, Object>) jdm.get(
                 MessageProcessorConsents.PARAMETERS);
-        maxNumberOfRedelivers = Integer.parseInt((String) parameters.get(DLCConstents.
-                MAX_REDELIVERY_COUNT));
+        Object o = parameters.get(DLCConstents.MAX_REDELIVERY_COUNT);
+        if(o == null) {
+            maxNumberOfRedelivers = 0;
+        } else {
+            maxNumberOfRedelivers = Integer.parseInt((String)o);
+        }
+
+        /** We are not going to access message Store if max number of redelivery is less than 0*/
+        if(maxNumberOfRedelivers <= 0) {
+            return;
+        }
 
         /**
          * We will keep the message store lock till the redelivery over
@@ -130,13 +139,17 @@ public class RedeliveryJob implements Jo
    static boolean handleEndpointReplay(Endpoint endpoint, MessageContext messageContext) {
         setFaultHandler(messageContext);
         if (endpoint != null && messageContext != null) {
+            if (endpoint.readyToSend()) {
+                endpoint.send(messageContext);
+                return true;
+            } else {
+                return false;
+            }
+
+        } else {
             return false;
-        } else if (endpoint.readyToSend()) {
-            endpoint.send(messageContext);
-            return true;
         }
 
-        return false;
     }
 
     /**
@@ -146,8 +159,11 @@ public class RedeliveryJob implements Jo
      * @return  true of success
      */
     static boolean handleSequenceReplay(Mediator mediator, MessageContext messageContext) {
-        setFaultHandler(messageContext);
-        mediator.mediate(messageContext);
+        if (mediator != null && messageContext != null) {
+            setFaultHandler(messageContext);
+            mediator.mediate(messageContext);
+            return true;
+        }
         return true;
     }
 
@@ -174,10 +190,12 @@ public class RedeliveryJob implements Jo
         String replayFaultHandler = (String)messageContext.getProperty(
                 DLCConstents.REPLAY_FAULT_HANDLER);
         if(replayFaultHandler != null) {
-            if(messageContext.getEndpoint(replayFaultHandler) != null ) {
+            if(messageContext.getConfiguration().getDefinedEndpoints().
+                    containsKey(replayFaultHandler)) {
                 Endpoint ep = messageContext.getEndpoint(replayFaultHandler);
                 messageContext.pushFaultHandler((FaultHandler)ep);
-            } else if (messageContext.getSequence(replayFaultHandler) != null) {
+            } else if (messageContext.getConfiguration().getDefinedSequences().
+                    containsKey(replayFaultHandler)) {
                 Mediator mediator = messageContext.getSequence(replayFaultHandler);
                 MediatorFaultHandler faultHandler = new MediatorFaultHandler(mediator);
                 messageContext.pushFaultHandler(faultHandler);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java?rev=1071899&r1=1071898&r2=1071899&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/dlc/RedeliveryProcessor.java Fri Feb 18 06:56:44 2011
@@ -18,29 +18,36 @@
  */
 package org.apache.synapse.message.processors.dlc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.*;
-import org.apache.synapse.core.SynapseEnvironment;
-import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.message.processors.AbstractMessageProcessor;
-import org.apache.synapse.message.processors.MessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
-import org.apache.synapse.securevault.commons.MBeanRegistrar;
 import org.quartz.JobDetail;
 
-import java.util.Map;
-
 /**
  * Redelivery processor is the Message processor which implements the Dead letter channel EIP
  * It will Time to time Redeliver the Messages to a given target.
  */
 public class RedeliveryProcessor extends AbstractMessageProcessor{
-      @Override
+
+    /**Dead Letter channel JMX API*/
+    private DeadLetterChannelView dlcView;
+
+    @Override
+    public void setMessageStore(MessageStore messageStore) {
+        super.setMessageStore(messageStore);
+        dlcView = new DeadLetterChannelView(messageStore);
+        org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(dlcView,
+                "Dead Letter Channel", messageStore.getName());
+    }
+
+    @Override
     protected JobDetail getJobDetail() {
         JobDetail jobDetail = new JobDetail();
         jobDetail.setName(messageStore.getName() + "- redelivery job");
         jobDetail.setJobClass(RedeliveryJob.class);
         return jobDetail;
     }
+
+    public DeadLetterChannelView getDlcView() {
+        return dlcView;
+    }
 }