You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2007/05/29 08:11:53 UTC
svn commit: r542420 - in /webservices/axis2/trunk/java/modules/clustering:
./ src/org/apache/axis2/clustering/configuration/
src/org/apache/axis2/clustering/context/
src/org/apache/axis2/clustering/context/commands/
src/org/apache/axis2/clustering/tribes/
Author: azeez
Date: Mon May 28 23:11:51 2007
New Revision: 542420
URL: http://svn.apache.org/viewvc?view=rev&rev=542420
Log:
Using a Queue to buffer the incoming clustering messages. Another thread will process the messages from this buffer.
Modified:
webservices/axis2/trunk/java/modules/clustering/pom.xml
webservices/axis2/trunk/java/modules/clustering/project.xml
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/pom.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/pom.xml?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/pom.xml (original)
+++ webservices/axis2/trunk/java/modules/clustering/pom.xml Mon May 28 23:11:51 2007
@@ -42,10 +42,6 @@
<groupId>org.apache.tomcat</groupId>
<artifactId>juli</artifactId>
</dependency>
- <dependency>
- <groupId>backport-util-concurrent</groupId>
- <artifactId>backport-util-concurrent</artifactId>
- </dependency>
</dependencies>
<build>
<sourceDirectory>src</sourceDirectory>
Modified: webservices/axis2/trunk/java/modules/clustering/project.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/project.xml?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/project.xml (original)
+++ webservices/axis2/trunk/java/modules/clustering/project.xml Mon May 28 23:11:51 2007
@@ -131,14 +131,6 @@
<module>true</module>
</properties>
</dependency>
- <dependency>
- <groupId>backport-util-concurrent</groupId>
- <artifactId>backport-util-concurrent</artifactId>
- <version>${backport_util_concurrent.version}</version>
- <properties>
- <module>true</module>
- </properties>
- </dependency>
</dependencies>
<build/>
<reports/>
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java Mon May 28 23:11:51 2007
@@ -170,7 +170,7 @@
this.sender = sender;
}
- public void notifyListener(ConfigurationClusteringCommand command) throws ClusteringFault {
+ public void process(ConfigurationClusteringCommand command) throws ClusteringFault {
switch (command.getCommandType()) {
case ConfigurationClusteringCommand.RELOAD_CONFIGURATION:
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Mon May 28 23:11:51 2007
@@ -71,7 +71,7 @@
(context instanceof ServiceGroupContext);
}
- public void notifyListener(ContextClusteringCommand command) throws ClusteringFault {
+ public void process(ContextClusteringCommand command) throws ClusteringFault {
switch (command.getCommandType()) {
case ContextClusteringCommand.CREATE_SERVICE_CONTEXT:
case ContextClusteringCommand.CREATE_SERVICE_GROUP_CONTEXT:
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Mon May 28 23:11:51 2007
@@ -34,6 +34,7 @@
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
ServiceGroupContext sgCtx =
configurationContext.getServiceGroupContext(serviceGroupContextId);
+ System.err.println("###### Gonna update SG prop in " + serviceGroupContextId + "===" + sgCtx);
if (sgCtx != null) {
propertyUpdater.updateProperties(sgCtx);
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Mon May 28 23:11:51 2007
@@ -16,16 +16,15 @@
package org.apache.axis2.clustering.tribes;
-import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
import org.apache.axis2.clustering.context.DefaultContextManager;
import org.apache.axis2.clustering.control.ControlCommand;
-import org.apache.axis2.util.threadpool.ThreadPool;
import org.apache.catalina.tribes.Member;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import sun.misc.Queue;
import java.io.Serializable;
@@ -33,19 +32,28 @@
public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
private static final Log log = LogFactory.getLog(ChannelListener.class);
- private ThreadPool threadPool;
-
private DefaultContextManager contextManager;
private DefaultConfigurationManager configurationManager;
private TribesControlCommandProcessor controlCommandProcessor;
+ /**
+ * The messages received are enqued. Another thread, messageProcessor, will
+ * process these messages in the order that they were received.
+ */
+ private final Queue cmdQueue = new Queue();
+
+ /**
+ * The thread which picks up messages from the cmdQueue and processes them.
+ */
+ private Thread messageProcessor;
+
public ChannelListener(DefaultConfigurationManager configurationManager,
DefaultContextManager contextManager,
TribesControlCommandProcessor controlCommandProcessor) {
this.configurationManager = configurationManager;
this.contextManager = contextManager;
this.controlCommandProcessor = controlCommandProcessor;
- this.threadPool = new ThreadPool();
+ startMessageProcessor();
}
public void setContextManager(DefaultContextManager contextManager) {
@@ -62,36 +70,68 @@
public void messageReceived(Serializable msg, Member sender) {
log.debug("Message received : " + msg);
- threadPool.execute(new MessageHandler(msg, sender));
+ synchronized (cmdQueue) {
+ cmdQueue.enqueue(new MemberMessage(msg, sender));
+ }
+ if (!messageProcessor.isAlive()) {
+ startMessageProcessor();
+ }
}
- private class MessageHandler implements Runnable {
- private Serializable msg;
+ private void startMessageProcessor() {
+ messageProcessor = new Thread(new MessageProcessor(), "ClusteringInComingMessageProcessor");
+ messageProcessor.setDaemon(true);
+ messageProcessor.start();
+ messageProcessor.setPriority(Thread.MAX_PRIORITY);
+ }
+
+ /**
+ * A container to hold a message and its sender
+ */
+ private class MemberMessage {
+ private Serializable message;
private Member sender;
- public MessageHandler(Serializable msg, Member sender) {
- this.msg = msg;
+ public MemberMessage(Serializable msg, Member sender) {
+ this.message = msg;
this.sender = sender;
}
+ public Serializable getMessage() {
+ return message;
+ }
+
+ public Member getSender() {
+ return sender;
+ }
+ }
+
+ /**
+ * A processor which continously polls for messages in the cmdQueue and processes them
+ */
+ private class MessageProcessor implements Runnable {
public void run() {
- if (msg instanceof ContextClusteringCommand) {
- try {
- contextManager.notifyListener((ContextClusteringCommand) msg);
- } catch (ClusteringFault e) {
- log.error("Could not process ContextCommand", e);
- }
- } else if (msg instanceof ConfigurationClusteringCommand) {
- try {
- configurationManager.notifyListener((ConfigurationClusteringCommand) msg);
- } catch (ClusteringFault e) {
- log.error("Could not process ConfigurationCommand", e);
- }
- } else if (msg instanceof ControlCommand) {
+ while (true) {
+ MemberMessage memberMessage = null;
try {
- controlCommandProcessor.process((ControlCommand) msg, sender);
- } catch (ClusteringFault e) {
- log.error("Could not process ControlCommand", e);
+ if (!cmdQueue.isEmpty()) {
+ memberMessage = (MemberMessage) cmdQueue.dequeue();
+ } else {
+ Thread.sleep(1);
+ continue;
+ }
+
+ Serializable msg = memberMessage.getMessage();
+ if (msg instanceof ContextClusteringCommand) {
+ contextManager.process((ContextClusteringCommand) msg);
+ } else if (msg instanceof ConfigurationClusteringCommand) {
+ configurationManager.process((ConfigurationClusteringCommand) msg);
+ } else if (msg instanceof ControlCommand) {
+ controlCommandProcessor.process((ControlCommand) msg,
+ memberMessage.getSender());
+ }
+ } catch (Throwable e) {
+ log.error("Could not process message ", e);
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon May 28 23:11:51 2007
@@ -34,6 +34,7 @@
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.membership.McastService;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
import org.apache.commons.logging.Log;
@@ -91,11 +92,11 @@
byte[] domain;
if (domainParam != null) {
domain = ((String) domainParam.getValue()).getBytes();
- channel.getMembershipService().setDomain(domain);
} else {
domain = "apache.axis2.domain".getBytes();
- channel.getMembershipService().setDomain(domain);
}
+ channel.getMembershipService().setDomain(domain);
+
DomainFilterInterceptor dfi = new DomainFilterInterceptor();
dfi.setDomain(domain);
channel.addInterceptor(dfi);
@@ -124,6 +125,7 @@
sender.setChannel(channel);
contextManager.setSender(sender);
configurationManager.setSender(sender);
+
listener.setContextManager(contextManager);
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org