You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/05/20 17:34:56 UTC

svn commit: r1125433 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors: ScheduledMessageProcessor.java forward/ScheduledMessageForwardingProcessor.java sampler/SamplingJob.java sampler/SamplingProcessor.java

Author: charith
Date: Fri May 20 15:34:56 2011
New Revision: 1125433

URL: http://svn.apache.org/viewvc?rev=1125433&view=rev
Log:
adding sampling processor MBeans to enable disable the processor

Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java Fri May 20 15:34:56 2011
@@ -34,7 +34,7 @@ public abstract class ScheduledMessagePr
 
     public static final String SCHEDULED_MESSAGE_PROCESSOR_GROUP =
             "synapse.message.processor.quartz";
-
+    public static final String PROCESSOR_INSTANCE = "processor.instance";
 
     /**
      * The scheduler, run the the processor

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java Fri May 20 15:34:56 2011
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.Atomi
 public class ScheduledMessageForwardingProcessor extends ScheduledMessageProcessor{
 
     public static final String BLOCKING_SENDER = "blocking.sender";
-    public static final String PROCESSOR_INSTANCE = "processor.instance";
+
 
     private BlockingMessageSender sender = null;
 

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java Fri May 20 15:34:56 2011
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.synapse.Mediator;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
 import org.apache.synapse.message.store.MessageStore;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
@@ -46,9 +47,17 @@ public class SamplingJob implements Job 
 
         Map<String, Object> parameters = (Map<String, Object>) jdm.get(
                 MessageProcessorConsents.PARAMETERS);
+        SamplingProcessor processor = (SamplingProcessor)
+                jdm.get(ScheduledMessageProcessor.PROCESSOR_INSTANCE);
+
         final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
         final String sequence = (String) parameters.get(SamplingProcessor.SEQUENCE);
 
+        // if processor is not active we do not proceed with the processing
+        if(!processor.isActive()) {
+            return;
+        }
+
         int conc = 1;
         if (concurrency instanceof Integer) {
             conc = (Integer) concurrency;

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java Fri May 20 15:34:56 2011
@@ -21,17 +21,35 @@ package org.apache.synapse.message.proce
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.message.processors.AbstractMessageProcessor;
 import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
 import org.quartz.SchedulerException;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class SamplingProcessor extends ScheduledMessageProcessor{
     private Log log = LogFactory.getLog(SamplingProcessor.class);
 
     public static final String CONCURRENCY = "concurrency";
     public static final String SEQUENCE = "sequence";
 
+    private AtomicBoolean active = new AtomicBoolean(true);
+
+    private SamplingProcessorView view;
+
+    @Override
+    public void init(SynapseEnvironment se) {
+        super.init(se);
+        view = new SamplingProcessorView(this);
+
+        // register MBean
+        org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(view,
+                "Message Sampling Processor view", getName());
+    }
+
     @Override
     protected JobDetail getJobDetail() {
         JobDetail jobDetail = new JobDetail();
@@ -41,6 +59,14 @@ public class SamplingProcessor extends S
     }
 
     @Override
+    protected JobDataMap getJobDataMap() {
+        JobDataMap jdm = new JobDataMap();
+        jdm.put(PROCESSOR_INSTANCE,this);
+        return jdm;
+
+    }
+
+    @Override
     public void destroy() {
          try {
             scheduler.deleteJob(name + "-sampling-job",
@@ -50,4 +76,20 @@ public class SamplingProcessor extends S
         }
         state = State.DESTROY;
     }
+
+    public boolean isActive() {
+        return active.get();
+    }
+
+    public void activate() {
+        active.set(true);
+    }
+
+    public void deactivate() {
+        active.set(false);
+    }
+
+    public SamplingProcessorView getView() {
+        return view;
+    }
 }