You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2012/02/14 12:23:12 UTC
svn commit: r1243880 - in
/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes:
Axis2ChannelListener.java Axis2GroupChannel.java ChannelSender.java
ClassLoaderUtil.java TribesClusteringAgent.java
Author: azeez
Date: Tue Feb 14 11:23:12 2012
New Revision: 1243880
URL: http://svn.apache.org/viewvc?rev=1243880&view=rev
Log:
This fixes the issue of state replication failing if the classes of the replicated objects are contained within service archive & module archive files. We had to take control of the Tribes message deserialization mechanism at the channel level so that we can set the relevant classloaders before deserialization.
Added:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
Modified:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java?rev=1243880&r1=1243879&r2=1243880&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2ChannelListener.java Tue Feb 14 11:23:12 2012
@@ -27,9 +27,6 @@ import org.apache.axis2.clustering.manag
import org.apache.axis2.clustering.state.DefaultStateManager;
import org.apache.axis2.clustering.state.StateClusteringCommand;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.AxisModule;
-import org.apache.axis2.description.AxisServiceGroup;
-import org.apache.axis2.engine.AxisConfiguration;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
@@ -40,9 +37,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
/**
* This is the Tribes channel listener which is used for listening on the channels, receiving
@@ -93,23 +87,11 @@ public class Axis2ChannelListener implem
*/
public void messageReceived(Serializable msg, Member sender) {
try {
- AxisConfiguration configuration = configurationContext.getAxisConfiguration();
- List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();
- classLoaders.add(configuration.getSystemClassLoader());
- classLoaders.add(getClass().getClassLoader());
- for (Iterator iter = configuration.getServiceGroups(); iter.hasNext();) {
- AxisServiceGroup group = (AxisServiceGroup) iter.next();
- classLoaders.add(group.getServiceGroupClassLoader());
- }
- for(Object obj: configuration.getModules().values()){
- AxisModule module = (AxisModule) obj;
- classLoaders.add(module.getModuleClassLoader());
- }
byte[] message = ((ByteMessage) msg).getMessage();
msg = XByteBuffer.deserialize(message,
0,
message.length,
- classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+ ClassLoaderUtil.getClassLoaders(configurationContext));
} catch (Exception e) {
String errMsg = "Cannot deserialize received message";
log.error(errMsg, e);
Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java?rev=1243880&view=auto
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java (added)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java Tue Feb 14 11:23:12 2012
@@ -0,0 +1,105 @@
+/*
+* Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.RpcMessage;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.util.Logs;
+
+import java.io.Serializable;
+
+/**
+ * Represents a Tribes GroupChannel. The difference between
+ * org.apache.catalina.tribes.group.GroupChannel & this class is that the proper classloaders
+ * are set before message deserialization
+ */
+public class Axis2GroupChannel extends GroupChannel{
+
+ private ConfigurationContext configurationContext;
+
+ public Axis2GroupChannel(ConfigurationContext configurationContext) {
+ this.configurationContext = configurationContext;
+ }
+
+ @Override
+ public void messageReceived(ChannelMessage msg) {
+ if ( msg == null ) return;
+ try {
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) +
+ " at " +new java.sql.Timestamp(System.currentTimeMillis())+
+ " from "+msg.getAddress().getName());
+ }
+
+ Serializable fwd;
+ if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
+ fwd = new ByteMessage(msg.getMessage().getBytes());
+ } else {
+ try {
+ fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
+ msg.getMessage().getLength(),
+ ClassLoaderUtil.getClassLoaders(configurationContext));
+ }catch (Exception sx) {
+ log.error("Unable to deserialize message:"+msg,sx);
+ return;
+ }
+ }
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
+ }
+
+ //get the actual member with the correct alive time
+ Member source = msg.getAddress();
+ boolean rx = false;
+ boolean delivered = false;
+ for (Object channelListener1 : channelListeners) {
+ ChannelListener channelListener = (ChannelListener) channelListener1;
+ if (channelListener != null && channelListener.accept(fwd, source)) {
+ channelListener.messageReceived(fwd, source);
+ delivered = true;
+ //if the message was accepted by an RPC channel, that channel
+ //is responsible for returning the reply, otherwise we send an absence reply
+ if (channelListener instanceof RpcChannel) rx = true;
+ }
+ }//for
+ if ((!rx) && (fwd instanceof RpcMessage)) {
+ //if we have a message that requires a response,
+ //but none was given, send back an immediate one
+ sendNoRpcChannelReply((RpcMessage)fwd,source);
+ }
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
+ }
+
+ } catch ( Exception x ) {
+ //this could be the channel listener throwing an exception, we should log it
+ //as a warning.
+ if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
+ throw new RemoteProcessException("Exception:"+x.getMessage(),x);
+ }
+ }
+}
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=1243880&r1=1243879&r2=1243880&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Tue Feb 14 11:23:12 2012
@@ -65,6 +65,7 @@ public class ChannelSender implements Me
channel.send(members, toByteMessage(msg),
Channel.SEND_OPTIONS_USE_ACK |
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE |
TribesConstants.MSG_ORDER_OPTION |
TribesConstants.AT_MOST_ONCE_OPTION |
additionalOptions);
@@ -72,6 +73,7 @@ public class ChannelSender implements Me
channel.send(members, toByteMessage(msg),
Channel.SEND_OPTIONS_ASYNCHRONOUS |
TribesConstants.MSG_ORDER_OPTION |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE |
TribesConstants.AT_MOST_ONCE_OPTION |
additionalOptions);
}
@@ -119,7 +121,8 @@ public class ChannelSender implements Me
try {
channel.send(new Member[]{channel.getLocalMember(true)},
toByteMessage(msg),
- Channel.SEND_OPTIONS_USE_ACK);
+ Channel.SEND_OPTIONS_USE_ACK |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE);
if (log.isDebugEnabled()) {
log.debug("Sent " + msg + " to self");
}
@@ -134,6 +137,7 @@ public class ChannelSender implements Me
channel.send(new Member[]{member}, toByteMessage(cmd),
Channel.SEND_OPTIONS_USE_ACK |
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE |
TribesConstants.MSG_ORDER_OPTION |
TribesConstants.AT_MOST_ONCE_OPTION);
if (log.isDebugEnabled()) {
Added: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java?rev=1243880&view=auto
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java (added)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java Tue Feb 14 11:23:12 2012
@@ -0,0 +1,49 @@
+/*
+* Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A util for manipulating classloaders to be used while serializing & deserializing Tribes messages
+ */
+public class ClassLoaderUtil {
+
+ public static ClassLoader[] getClassLoaders(ConfigurationContext configurationContext) {
+ AxisConfiguration configuration = configurationContext.getAxisConfiguration();
+ List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();
+ classLoaders.add(configuration.getSystemClassLoader());
+ classLoaders.add(ClassLoaderUtil.class.getClassLoader());
+ for (Iterator iter = configuration.getServiceGroups(); iter.hasNext(); ) {
+ AxisServiceGroup group = (AxisServiceGroup) iter.next();
+ classLoaders.add(group.getServiceGroupClassLoader());
+ }
+ for (Object obj : configuration.getModules().values()) {
+ AxisModule module = (AxisModule) obj;
+ classLoaders.add(module.getModuleClassLoader());
+ }
+ return classLoaders.toArray(new ClassLoader[classLoaders.size()]);
+ }
+}
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1243880&r1=1243879&r2=1243880&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Tue Feb 14 11:23:12 2012
@@ -156,7 +156,7 @@ public class TribesClusteringAgent imple
addRequestBlockingHandlerToInFlows();
primaryMembershipManager = new MembershipManager(configurationContext);
- channel = new GroupChannel();
+ channel = new Axis2GroupChannel(configurationContext);
channel.setHeartbeat(true);
channelSender = new ChannelSender(channel, primaryMembershipManager, synchronizeAllMembers());
axis2ChannelListener =
@@ -225,7 +225,7 @@ public class TribesClusteringAgent imple
ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
configurationContext.addContextListener(contextListener);
}
-
+ configurationContext.getAxisConfiguration().addObservers(new TribesAxisObserver());
configurationContext.
setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
log.info("Cluster initialization completed.");
@@ -680,7 +680,7 @@ public class TribesClusteringAgent imple
primaryMembershipManager.getLongestLivingMember() : // First try to get from the longest member alive
primaryMembershipManager.getRandomMember(); // Else get from a random member
String memberHost = TribesUtil.getName(member);
- log.info("Trying to send intialization request to " + memberHost);
+ log.info("Trying to send initialization request to " + memberHost);
try {
if (!sentMembersList.contains(memberHost)) {
Response[] responses;
@@ -688,7 +688,8 @@ public class TribesClusteringAgent imple
responses = rpcInitChannel.send(new Member[]{member},
command,
RpcChannel.FIRST_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE,
10000);
if (responses.length == 0) {
try {