You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2012/02/14 12:23:12 UTC

svn commit: r1243880 - in /axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes: Axis2ChannelListener.java Axis2GroupChannel.java ChannelSender.java ClassLoaderUtil.java TribesClusteringAgent.java

Author: azeez
Date: Tue Feb 14 11:23:12 2012
New Revision: 1243880

URL: http://svn.apache.org/viewvc?rev=1243880&view=rev
Log:
This fixes the issue of state replication failing if the classes of the replicated objects are contained within service archive & module archive files. We had to take control of the Tribes message deserialization mechanism at the channel level so that we can set the relevant classloaders before deserialization.


Added:
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
Modified:
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java

Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java?rev=1243880&r1=1243879&r2=1243880&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java Tue Feb 14 11:23:12 2012
@@ -27,9 +27,6 @@ import org.apache.axis2.clustering.manag
 import org.apache.axis2.clustering.state.DefaultStateManager;
 import org.apache.axis2.clustering.state.StateClusteringCommand;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisModule;
-import org.apache.axis2.description.AxisServiceGroup;
-import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.catalina.tribes.ByteMessage;
 import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.Member;
@@ -40,9 +37,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 
 /**
  * This is the Tribes channel listener which is used for listening on the channels, receiving
@@ -93,23 +87,11 @@ public class Axis2ChannelListener implem
      */
     public void messageReceived(Serializable msg, Member sender) {
         try {
-            AxisConfiguration configuration = configurationContext.getAxisConfiguration();
-            List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();
-            classLoaders.add(configuration.getSystemClassLoader());
-            classLoaders.add(getClass().getClassLoader());
-            for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) {
-                AxisServiceGroup group = (AxisServiceGroup) iter.next();
-                classLoaders.add(group.getServiceGroupClassLoader());
-            }
-            for(Object obj: configuration.getModules().values()){    
-                AxisModule module = (AxisModule) obj;
-                classLoaders.add(module.getModuleClassLoader());
-            }
             byte[] message = ((ByteMessage) msg).getMessage();
             msg = XByteBuffer.deserialize(message,
                                           0,
                                           message.length,
-                                          classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+                                          ClassLoaderUtil.getClassLoaders(configurationContext));
         } catch (Exception e) {
             String errMsg = "Cannot deserialize received message";
             log.error(errMsg, e);

Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java?rev=1243880&view=auto
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java (added)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java Tue Feb 14 11:23:12 2012
@@ -0,0 +1,105 @@
+/*
+*  Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.RpcMessage;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.util.Logs;
+
+import java.io.Serializable;
+
+/**
+ * Represents a Tribes GroupChannel. The difference between
+ * org.apache.catalina.tribes.group.GroupChannel & this class is that the proper classloaders
+ * are set before message deserialization
+ */
+public class Axis2GroupChannel extends GroupChannel{
+
+    private ConfigurationContext configurationContext;
+
+    public Axis2GroupChannel(ConfigurationContext configurationContext) {
+        this.configurationContext = configurationContext;
+    }
+
+    @Override
+    public void messageReceived(ChannelMessage msg) {
+        if ( msg == null ) return;
+        try {
+            if ( Logs.MESSAGES.isTraceEnabled() ) {
+                Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) +
+                                    " at " +new java.sql.Timestamp(System.currentTimeMillis())+
+                                    " from "+msg.getAddress().getName());
+            }
+
+            Serializable fwd;
+            if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
+                fwd = new ByteMessage(msg.getMessage().getBytes());
+            } else {
+                try {
+                    fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
+                                                  msg.getMessage().getLength(),
+                                                  ClassLoaderUtil.getClassLoaders(configurationContext));
+                }catch (Exception sx) {
+                    log.error("Unable to deserialize message:"+msg,sx);
+                    return;
+                }
+            }
+            if ( Logs.MESSAGES.isTraceEnabled() ) {
+                Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
+            }
+
+            //get the actual member with the correct alive time
+            Member source = msg.getAddress();
+            boolean rx = false;
+            boolean delivered = false;
+            for (Object channelListener1 : channelListeners) {
+                ChannelListener channelListener = (ChannelListener) channelListener1;
+                if (channelListener != null && channelListener.accept(fwd, source)) {
+                    channelListener.messageReceived(fwd, source);
+                    delivered = true;
+                    //if the message was accepted by an RPC channel, that channel
+                    //is responsible for returning the reply, otherwise we send an absence reply
+                    if (channelListener instanceof RpcChannel) rx = true;
+                }
+            }//for
+            if ((!rx) && (fwd instanceof RpcMessage)) {
+                //if we have a message that requires a response,
+                //but none was given, send back an immediate one
+                sendNoRpcChannelReply((RpcMessage)fwd,source);
+            }
+            if ( Logs.MESSAGES.isTraceEnabled() ) {
+                Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
+            }
+
+        } catch ( Exception x ) {
+            //this could be the channel listener throwing an exception, we should log it
+            //as a warning.
+            if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
+            throw new RemoteProcessException("Exception:"+x.getMessage(),x);
+        }
+    }
+}

Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=1243880&r1=1243879&r2=1243880&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Tue Feb 14 11:23:12 2012
@@ -65,6 +65,7 @@ public class ChannelSender implements Me
                     channel.send(members, toByteMessage(msg),
                                  Channel.SEND_OPTIONS_USE_ACK |
                                  Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+                                 Channel.SEND_OPTIONS_BYTE_MESSAGE |
                                  TribesConstants.MSG_ORDER_OPTION |
                                  TribesConstants.AT_MOST_ONCE_OPTION |
                                  additionalOptions);
@@ -72,6 +73,7 @@ public class ChannelSender implements Me
                     channel.send(members, toByteMessage(msg),
                                  Channel.SEND_OPTIONS_ASYNCHRONOUS |
                                  TribesConstants.MSG_ORDER_OPTION |
+                                 Channel.SEND_OPTIONS_BYTE_MESSAGE |
                                  TribesConstants.AT_MOST_ONCE_OPTION |
                                  additionalOptions);
                 }
@@ -119,7 +121,8 @@ public class ChannelSender implements Me
         try {
             channel.send(new Member[]{channel.getLocalMember(true)},
                          toByteMessage(msg),
-                         Channel.SEND_OPTIONS_USE_ACK);
+                         Channel.SEND_OPTIONS_USE_ACK |
+                         Channel.SEND_OPTIONS_BYTE_MESSAGE);
             if (log.isDebugEnabled()) {
                 log.debug("Sent " + msg + " to self");
             }
@@ -134,6 +137,7 @@ public class ChannelSender implements Me
                 channel.send(new Member[]{member}, toByteMessage(cmd),
                              Channel.SEND_OPTIONS_USE_ACK |
                              Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+                             Channel.SEND_OPTIONS_BYTE_MESSAGE |
                              TribesConstants.MSG_ORDER_OPTION |
                              TribesConstants.AT_MOST_ONCE_OPTION);
                 if (log.isDebugEnabled()) {

Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java?rev=1243880&view=auto
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java (added)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java Tue Feb 14 11:23:12 2012
@@ -0,0 +1,49 @@
+/*
+*  Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+*  WSO2 Inc. licenses this file to you under the Apache License,
+*  Version 2.0 (the "License"); you may not use this file except
+*  in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A util for manipulating classloaders to be used while serializing & deserializing Tribes messages
+ */
+public class ClassLoaderUtil {
+
+    public static ClassLoader[] getClassLoaders(ConfigurationContext configurationContext) {
+        AxisConfiguration configuration = configurationContext.getAxisConfiguration();
+        List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();
+        classLoaders.add(configuration.getSystemClassLoader());
+        classLoaders.add(ClassLoaderUtil.class.getClassLoader());
+        for (Iterator iter = configuration.getServiceGroups(); iter.hasNext(); ) {
+            AxisServiceGroup group = (AxisServiceGroup) iter.next();
+            classLoaders.add(group.getServiceGroupClassLoader());
+        }
+        for (Object obj : configuration.getModules().values()) {
+            AxisModule module = (AxisModule) obj;
+            classLoaders.add(module.getModuleClassLoader());
+        }
+        return classLoaders.toArray(new ClassLoader[classLoaders.size()]);
+    }
+}

Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1243880&r1=1243879&r2=1243880&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Tue Feb 14 11:23:12 2012
@@ -156,7 +156,7 @@ public class TribesClusteringAgent imple
         addRequestBlockingHandlerToInFlows();
         primaryMembershipManager = new MembershipManager(configurationContext);
 
-        channel = new GroupChannel();
+        channel = new Axis2GroupChannel(configurationContext);
         channel.setHeartbeat(true);
         channelSender = new ChannelSender(channel, primaryMembershipManager, synchronizeAllMembers());
         axis2ChannelListener =
@@ -225,7 +225,7 @@ public class TribesClusteringAgent imple
             ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
             configurationContext.addContextListener(contextListener);
         }
-
+        configurationContext.getAxisConfiguration().addObservers(new TribesAxisObserver());
         configurationContext.
                 setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
         log.info("Cluster initialization completed.");
@@ -680,7 +680,7 @@ public class TribesClusteringAgent imple
                             primaryMembershipManager.getLongestLivingMember() : // First try to get from the longest member alive
                             primaryMembershipManager.getRandomMember(); // Else get from a random member
             String memberHost = TribesUtil.getName(member);
-            log.info("Trying to send intialization request to " + memberHost);
+            log.info("Trying to send initialization request to " + memberHost);
             try {
                 if (!sentMembersList.contains(memberHost)) {
                     Response[] responses;
@@ -688,7 +688,8 @@ public class TribesClusteringAgent imple
                     responses = rpcInitChannel.send(new Member[]{member},
                                                     command,
                                                     RpcChannel.FIRST_REPLY,
-                                                    Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                    Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                                    Channel.SEND_OPTIONS_BYTE_MESSAGE,
                                                     10000);
                     if (responses.length == 0) {
                         try {