You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2007/05/29 08:11:53 UTC

svn commit: r542420 - in /webservices/axis2/trunk/java/modules/clustering: ./ src/org/apache/axis2/clustering/configuration/ src/org/apache/axis2/clustering/context/ src/org/apache/axis2/clustering/context/commands/ src/org/apache/axis2/clustering/tribes/

Author: azeez
Date: Mon May 28 23:11:51 2007
New Revision: 542420

URL: http://svn.apache.org/viewvc?view=rev&rev=542420
Log:
Using a Queue to buffer the incoming clustering messages. Another thread will process the messages from this buffer.


Modified:
    webservices/axis2/trunk/java/modules/clustering/pom.xml
    webservices/axis2/trunk/java/modules/clustering/project.xml
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/pom.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/pom.xml?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/pom.xml (original)
+++ webservices/axis2/trunk/java/modules/clustering/pom.xml Mon May 28 23:11:51 2007
@@ -42,10 +42,6 @@
             <groupId>org.apache.tomcat</groupId>
             <artifactId>juli</artifactId>
         </dependency>
-        <dependency>
-            <groupId>backport-util-concurrent</groupId>
-            <artifactId>backport-util-concurrent</artifactId>
-        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src</sourceDirectory>

Modified: webservices/axis2/trunk/java/modules/clustering/project.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/project.xml?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/project.xml (original)
+++ webservices/axis2/trunk/java/modules/clustering/project.xml Mon May 28 23:11:51 2007
@@ -131,14 +131,6 @@
                 <module>true</module>
             </properties>
         </dependency>
-        <dependency>
-            <groupId>backport-util-concurrent</groupId>
-            <artifactId>backport-util-concurrent</artifactId>
-            <version>${backport_util_concurrent.version}</version>
-            <properties>
-                <module>true</module>
-            </properties>
-        </dependency>
     </dependencies>
     <build/>
     <reports/>

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java Mon May 28 23:11:51 2007
@@ -170,7 +170,7 @@
         this.sender = sender;
     }
 
-    public void notifyListener(ConfigurationClusteringCommand command) throws ClusteringFault {
+    public void process(ConfigurationClusteringCommand command) throws ClusteringFault {
 
         switch (command.getCommandType()) {
             case ConfigurationClusteringCommand.RELOAD_CONFIGURATION:

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Mon May 28 23:11:51 2007
@@ -71,7 +71,7 @@
                (context instanceof ServiceGroupContext);
     }
 
-    public void notifyListener(ContextClusteringCommand command) throws ClusteringFault {
+    public void process(ContextClusteringCommand command) throws ClusteringFault {
         switch (command.getCommandType()) {
             case ContextClusteringCommand.CREATE_SERVICE_CONTEXT:
             case ContextClusteringCommand.CREATE_SERVICE_GROUP_CONTEXT:

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Mon May 28 23:11:51 2007
@@ -34,6 +34,7 @@
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
         ServiceGroupContext sgCtx =
                 configurationContext.getServiceGroupContext(serviceGroupContextId);
+        System.err.println("###### Gonna update SG prop in " + serviceGroupContextId + "===" + sgCtx);
         if (sgCtx != null) {
             propertyUpdater.updateProperties(sgCtx);
         }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Mon May 28 23:11:51 2007
@@ -16,16 +16,15 @@
 
 package org.apache.axis2.clustering.tribes;
 
-import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
 import org.apache.axis2.clustering.context.DefaultContextManager;
 import org.apache.axis2.clustering.control.ControlCommand;
-import org.apache.axis2.util.threadpool.ThreadPool;
 import org.apache.catalina.tribes.Member;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import sun.misc.Queue;
 
 import java.io.Serializable;
 
@@ -33,19 +32,28 @@
 public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
     private static final Log log = LogFactory.getLog(ChannelListener.class);
 
-    private ThreadPool threadPool;
-
     private DefaultContextManager contextManager;
     private DefaultConfigurationManager configurationManager;
     private TribesControlCommandProcessor controlCommandProcessor;
 
+    /**
+     * The messages received are enqued. Another thread, messageProcessor, will
+     * process these messages in the order that they were received.
+     */
+    private final Queue cmdQueue = new Queue();
+
+    /**
+     * The thread which picks up messages from the cmdQueue and processes them.
+     */
+    private Thread messageProcessor;
+
     public ChannelListener(DefaultConfigurationManager configurationManager,
                            DefaultContextManager contextManager,
                            TribesControlCommandProcessor controlCommandProcessor) {
         this.configurationManager = configurationManager;
         this.contextManager = contextManager;
         this.controlCommandProcessor = controlCommandProcessor;
-        this.threadPool = new ThreadPool();
+        startMessageProcessor();
     }
 
     public void setContextManager(DefaultContextManager contextManager) {
@@ -62,36 +70,68 @@
 
     public void messageReceived(Serializable msg, Member sender) {
         log.debug("Message received : " + msg);
-        threadPool.execute(new MessageHandler(msg, sender));
+        synchronized (cmdQueue) {
+            cmdQueue.enqueue(new MemberMessage(msg, sender));
+        }
+        if (!messageProcessor.isAlive()) {
+            startMessageProcessor();
+        }
     }
 
-    private class MessageHandler implements Runnable {
-        private Serializable msg;
+    private void startMessageProcessor() {
+        messageProcessor = new Thread(new MessageProcessor(), "ClusteringInComingMessageProcessor");
+        messageProcessor.setDaemon(true);
+        messageProcessor.start();
+        messageProcessor.setPriority(Thread.MAX_PRIORITY);
+    }
+
+    /**
+     * A container to hold a message and its sender
+     */
+    private class MemberMessage {
+        private Serializable message;
         private Member sender;
 
-        public MessageHandler(Serializable msg, Member sender) {
-            this.msg = msg;
+        public MemberMessage(Serializable msg, Member sender) {
+            this.message = msg;
             this.sender = sender;
         }
 
+        public Serializable getMessage() {
+            return message;
+        }
+
+        public Member getSender() {
+            return sender;
+        }
+    }
+
+    /**
+     * A processor which continously polls for messages in the cmdQueue and processes them
+     */
+    private class MessageProcessor implements Runnable {
         public void run() {
-            if (msg instanceof ContextClusteringCommand) {
-                try {
-                    contextManager.notifyListener((ContextClusteringCommand) msg);
-                } catch (ClusteringFault e) {
-                    log.error("Could not process ContextCommand", e);
-                }
-            } else if (msg instanceof ConfigurationClusteringCommand) {
-                try {
-                    configurationManager.notifyListener((ConfigurationClusteringCommand) msg);
-                } catch (ClusteringFault e) {
-                    log.error("Could not process ConfigurationCommand", e);
-                }
-            } else if (msg instanceof ControlCommand) {
+            while (true) {
+                MemberMessage memberMessage = null;
                 try {
-                    controlCommandProcessor.process((ControlCommand) msg, sender);
-                } catch (ClusteringFault e) {
-                    log.error("Could not process ControlCommand", e);
+                    if (!cmdQueue.isEmpty()) {
+                        memberMessage = (MemberMessage) cmdQueue.dequeue();
+                    } else {
+                        Thread.sleep(1);
+                        continue;
+                    }
+
+                    Serializable msg = memberMessage.getMessage();
+                    if (msg instanceof ContextClusteringCommand) {
+                        contextManager.process((ContextClusteringCommand) msg);
+                    } else if (msg instanceof ConfigurationClusteringCommand) {
+                        configurationManager.process((ConfigurationClusteringCommand) msg);
+                    } else if (msg instanceof ControlCommand) {
+                        controlCommandProcessor.process((ControlCommand) msg,
+                                                        memberMessage.getSender());
+                    }
+                } catch (Throwable e) {
+                    log.error("Could not process message ", e);
                 }
             }
         }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=542420&r1=542419&r2=542420
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon May 28 23:11:51 2007
@@ -34,6 +34,7 @@
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ManagedChannel;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.membership.McastService;
 import org.apache.catalina.tribes.group.GroupChannel;
 import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
 import org.apache.commons.logging.Log;
@@ -91,11 +92,11 @@
             byte[] domain;
             if (domainParam != null) {
                 domain = ((String) domainParam.getValue()).getBytes();
-                channel.getMembershipService().setDomain(domain);
             } else {
                 domain = "apache.axis2.domain".getBytes();
-                channel.getMembershipService().setDomain(domain);
             }
+            channel.getMembershipService().setDomain(domain);
+
             DomainFilterInterceptor dfi = new DomainFilterInterceptor();
             dfi.setDomain(domain);
             channel.addInterceptor(dfi);
@@ -124,6 +125,7 @@
             sender.setChannel(channel);
             contextManager.setSender(sender);
             configurationManager.setSender(sender);
+
 
             listener.setContextManager(contextManager);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org