You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ch...@apache.org on 2011/05/20 17:34:56 UTC
svn commit: r1125433 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors:
ScheduledMessageProcessor.java
forward/ScheduledMessageForwardingProcessor.java sampler/SamplingJob.java
sampler/SamplingProcessor.java
Author: charith
Date: Fri May 20 15:34:56 2011
New Revision: 1125433
URL: http://svn.apache.org/viewvc?rev=1125433&view=rev
Log:
adding sampling processor MBeans to enable disable the processor
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/ScheduledMessageProcessor.java Fri May 20 15:34:56 2011
@@ -34,7 +34,7 @@ public abstract class ScheduledMessagePr
public static final String SCHEDULED_MESSAGE_PROCESSOR_GROUP =
"synapse.message.processor.quartz";
-
+ public static final String PROCESSOR_INSTANCE = "processor.instance";
/**
* The scheduler, run the the processor
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/forward/ScheduledMessageForwardingProcessor.java Fri May 20 15:34:56 2011
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.Atomi
public class ScheduledMessageForwardingProcessor extends ScheduledMessageProcessor{
public static final String BLOCKING_SENDER = "blocking.sender";
- public static final String PROCESSOR_INSTANCE = "processor.instance";
+
private BlockingMessageSender sender = null;
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingJob.java Fri May 20 15:34:56 2011
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.message.processors.MessageProcessorConsents;
+import org.apache.synapse.message.processors.ScheduledMessageProcessor;
import org.apache.synapse.message.store.MessageStore;
import org.quartz.Job;
import org.quartz.JobDataMap;
@@ -46,9 +47,17 @@ public class SamplingJob implements Job
Map<String, Object> parameters = (Map<String, Object>) jdm.get(
MessageProcessorConsents.PARAMETERS);
+ SamplingProcessor processor = (SamplingProcessor)
+ jdm.get(ScheduledMessageProcessor.PROCESSOR_INSTANCE);
+
final Object concurrency = jdm.get(SamplingProcessor.CONCURRENCY);
final String sequence = (String) parameters.get(SamplingProcessor.SEQUENCE);
+ // if processor is not active we do not proceed with the processing
+ if(!processor.isActive()) {
+ return;
+ }
+
int conc = 1;
if (concurrency instanceof Integer) {
conc = (Integer) concurrency;
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java?rev=1125433&r1=1125432&r2=1125433&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/message/processors/sampler/SamplingProcessor.java Fri May 20 15:34:56 2011
@@ -21,17 +21,35 @@ package org.apache.synapse.message.proce
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.message.processors.AbstractMessageProcessor;
import org.apache.synapse.message.processors.ScheduledMessageProcessor;
+import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.SchedulerException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class SamplingProcessor extends ScheduledMessageProcessor{
private Log log = LogFactory.getLog(SamplingProcessor.class);
public static final String CONCURRENCY = "concurrency";
public static final String SEQUENCE = "sequence";
+ private AtomicBoolean active = new AtomicBoolean(true);
+
+ private SamplingProcessorView view;
+
+ @Override
+ public void init(SynapseEnvironment se) {
+ super.init(se);
+ view = new SamplingProcessorView(this);
+
+ // register MBean
+ org.apache.synapse.commons.jmx.MBeanRegistrar.getInstance().registerMBean(view,
+ "Message Sampling Processor view", getName());
+ }
+
@Override
protected JobDetail getJobDetail() {
JobDetail jobDetail = new JobDetail();
@@ -41,6 +59,14 @@ public class SamplingProcessor extends S
}
@Override
+ protected JobDataMap getJobDataMap() {
+ JobDataMap jdm = new JobDataMap();
+ jdm.put(PROCESSOR_INSTANCE,this);
+ return jdm;
+
+ }
+
+ @Override
public void destroy() {
try {
scheduler.deleteJob(name + "-sampling-job",
@@ -50,4 +76,20 @@ public class SamplingProcessor extends S
}
state = State.DESTROY;
}
+
+ public boolean isActive() {
+ return active.get();
+ }
+
+ public void activate() {
+ active.set(true);
+ }
+
+ public void deactivate() {
+ active.set(false);
+ }
+
+ public SamplingProcessorView getView() {
+ return view;
+ }
}