You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2012/03/01 12:28:59 UTC
svn commit: r1295519 - in /axis/axis2/java/core/branches/1_6/modules:
clustering/src/org/apache/axis2/clustering/
clustering/src/org/apache/axis2/clustering/tribes/
kernel/src/org/apache/axis2/clustering/
Author: azeez
Date: Thu Mar 1 11:28:58 2012
New Revision: 1295519
URL: http://svn.apache.org/viewvc?rev=1295519&view=rev
Log:
Manually merging in all clustering changes from Axis2 trunk to the Axis2 1_6 branch
Added:
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java
axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java
Modified:
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java Thu Mar 1 11:28:58 2012
@@ -61,7 +61,7 @@ public class ClusteringUtils {
tempDirectory + File.separator +
(System.currentTimeMillis() + RANDOM.nextDouble());
if(!new File(tempDirName).mkdirs()) {
- throw new Exception("Could not create temp dir " + tempDirName);
+ throw new Exception("Could not create temp dir " + tempDirName);
}
serviceArchive = new File(tempDirName + File.separator + serviceGroupName);
FileOutputStream out = null;
Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/Axis2GroupChannel.java Thu Mar 1 11:28:58 2012
@@ -0,0 +1,99 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.UniqueId;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.RpcMessage;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.util.Logs;
+
+import java.io.Serializable;
+
+/**
+ * Represents a Tribes GroupChannel. The difference between
+ * org.apache.catalina.tribes.group.GroupChannel & this class is that the proper classloaders
+ * are set before message deserialization
+ */
+public class Axis2GroupChannel extends GroupChannel{
+
+ @Override
+ public void messageReceived(ChannelMessage msg) {
+ if ( msg == null ) return;
+ try {
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) +
+ " at " +new java.sql.Timestamp(System.currentTimeMillis())+
+ " from "+msg.getAddress().getName());
+ }
+
+ Serializable fwd;
+ if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
+ fwd = new ByteMessage(msg.getMessage().getBytes());
+ } else {
+ try {
+ fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0,
+ msg.getMessage().getLength(),
+ ClassLoaderUtil.getClassLoaders());
+ }catch (Exception sx) {
+ log.error("Unable to deserialize message:"+msg,sx);
+ return;
+ }
+ }
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
+ }
+
+ //get the actual member with the correct alive time
+ Member source = msg.getAddress();
+ boolean rx = false;
+ boolean delivered = false;
+ for (Object channelListener1 : channelListeners) {
+ ChannelListener channelListener = (ChannelListener) channelListener1;
+ if (channelListener != null && channelListener.accept(fwd, source)) {
+ channelListener.messageReceived(fwd, source);
+ delivered = true;
+ //if the message was accepted by an RPC channel, that channel
+ //is responsible for returning the reply, otherwise we send an absence reply
+ if (channelListener instanceof RpcChannel) rx = true;
+ }
+ }//for
+ if ((!rx) && (fwd instanceof RpcMessage)) {
+ //if we have a message that requires a response,
+ //but none was given, send back an immediate one
+ sendNoRpcChannelReply((RpcMessage)fwd,source);
+ }
+ if ( Logs.MESSAGES.isTraceEnabled() ) {
+ Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
+ }
+
+ } catch ( Exception x ) {
+ //this could be the channel listener throwing an exception, we should log it
+ //as a warning.
+ if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
+ throw new RemoteProcessException("Exception:"+x.getMessage(),x);
+ }
+ }
+}
Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Thu Mar 1 11:28:58 2012
@@ -49,9 +49,9 @@ public class ChannelSender implements Me
this.synchronizeAllMembers = synchronizeAllMembers;
}
- public void sendToGroup(ClusteringCommand msg,
- MembershipManager membershipManager,
- int additionalOptions) throws ClusteringFault {
+ public synchronized void sendToGroup(ClusteringCommand msg,
+ MembershipManager membershipManager,
+ int additionalOptions) throws ClusteringFault {
if (channel == null) {
return;
}
@@ -65,6 +65,7 @@ public class ChannelSender implements Me
channel.send(members, toByteMessage(msg),
Channel.SEND_OPTIONS_USE_ACK |
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE |
TribesConstants.MSG_ORDER_OPTION |
TribesConstants.AT_MOST_ONCE_OPTION |
additionalOptions);
@@ -72,6 +73,7 @@ public class ChannelSender implements Me
channel.send(members, toByteMessage(msg),
Channel.SEND_OPTIONS_ASYNCHRONOUS |
TribesConstants.MSG_ORDER_OPTION |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE |
TribesConstants.AT_MOST_ONCE_OPTION |
additionalOptions);
}
@@ -100,7 +102,7 @@ public class ChannelSender implements Me
}
public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
- sendToGroup(msg, this.membershipManager, 0);
+ sendToGroup(msg, this.membershipManager, 0);
}
private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
@@ -119,7 +121,8 @@ public class ChannelSender implements Me
try {
channel.send(new Member[]{channel.getLocalMember(true)},
toByteMessage(msg),
- Channel.SEND_OPTIONS_USE_ACK);
+ Channel.SEND_OPTIONS_USE_ACK |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE);
if (log.isDebugEnabled()) {
log.debug("Sent " + msg + " to self");
}
@@ -134,6 +137,7 @@ public class ChannelSender implements Me
channel.send(new Member[]{member}, toByteMessage(cmd),
Channel.SEND_OPTIONS_USE_ACK |
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE |
TribesConstants.MSG_ORDER_OPTION |
TribesConstants.AT_MOST_ONCE_OPTION);
if (log.isDebugEnabled()) {
Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/ClassLoaderUtil.java Thu Mar 1 11:28:58 2012
@@ -0,0 +1,88 @@
+/*
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A util for manipulating classloaders to be used while serializing & deserializing Tribes messages
+ */
+public class ClassLoaderUtil {
+
+ private static Map<String, ClassLoader> classLoaders =
+ new ConcurrentHashMap<String, ClassLoader>();
+
+ public static void init(AxisConfiguration configuration) {
+ classLoaders.put("system", configuration.getSystemClassLoader());
+ classLoaders.put("axis2", ClassLoaderUtil.class.getClassLoader());
+ for (Iterator iter = configuration.getServiceGroups(); iter.hasNext(); ) {
+ AxisServiceGroup group = (AxisServiceGroup) iter.next();
+ ClassLoader serviceGroupClassLoader = group.getServiceGroupClassLoader();
+ if (serviceGroupClassLoader != null) {
+ classLoaders.put(getServiceGroupMapKey(group), serviceGroupClassLoader);
+ }
+ }
+ for (Object obj : configuration.getModules().values()) {
+ AxisModule module = (AxisModule) obj;
+ ClassLoader moduleClassLoader = module.getModuleClassLoader();
+ if (moduleClassLoader != null) {
+ classLoaders.put(getModuleMapKey(module), moduleClassLoader);
+ }
+ }
+ }
+
+ public static void addServiceGroupClassLoader(AxisServiceGroup serviceGroup) {
+ ClassLoader serviceGroupClassLoader = serviceGroup.getServiceGroupClassLoader();
+ if (serviceGroupClassLoader != null) {
+ classLoaders.put(getServiceGroupMapKey(serviceGroup), serviceGroupClassLoader);
+ }
+ }
+
+ public static void removeServiceGroupClassLoader(AxisServiceGroup serviceGroup) {
+ classLoaders.remove(getServiceGroupMapKey(serviceGroup));
+ }
+
+ private static String getServiceGroupMapKey(AxisServiceGroup serviceGroup) {
+ return serviceGroup.getServiceGroupName() + "$#sg";
+ }
+
+ public static void addModuleClassLoader(AxisModule module) {
+ ClassLoader moduleClassLoader = module.getModuleClassLoader();
+ if (moduleClassLoader != null) {
+ classLoaders.put(getModuleMapKey(module), moduleClassLoader);
+ }
+ }
+
+ public static void removeModuleClassLoader(AxisModule axisModule) {
+ classLoaders.remove(getModuleMapKey(axisModule));
+ }
+
+ private static String getModuleMapKey(AxisModule module) {
+ return module.getName() + "-" + module.getVersion() + "$#mod";
+ }
+
+ public static ClassLoader[] getClassLoaders() {
+ return classLoaders.values().toArray(new ClassLoader[classLoaders.size()]);
+ }
+}
Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Thu Mar 1 11:28:58 2012
@@ -37,6 +37,10 @@ import org.apache.commons.logging.LogFac
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Responsible for managing the membership. Handles membership changes.
@@ -68,10 +72,22 @@ public class MembershipManager {
private final List<Member> wkaMembers = new ArrayList<Member>();
/**
+ * List of Well-Known members which have not responded to the MEMBER_LIST message.
+ * We need to retry sending the MEMBER_LIST message to these members until they respond,
+ * otherwise, we cannot be sure whether these WKA members added the members in the MEMBER_LIST
+ */
+ private final List<Member> nonRespondingWkaMembers = new CopyOnWriteArrayList<Member>();
+
+ /**
* The member representing this node
*/
private Member localMember;
+ /**
+ *
+ */
+ private boolean isMemberListResponseReceived;
+
public MembershipManager(ConfigurationContext configContext) {
this.configContext = configContext;
}
@@ -87,9 +103,10 @@ public class MembershipManager {
return rpcMembershipChannel;
}
- public void setStaticMembershipInterceptor(
- StaticMembershipInterceptor staticMembershipInterceptor) {
+ public void setupStaticMembershipManagement(StaticMembershipInterceptor staticMembershipInterceptor) {
this.staticMembershipInterceptor = staticMembershipInterceptor;
+ ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+ scheduler.scheduleWithFixedDelay(new MemberListSenderTask(), 5, 5, TimeUnit.SECONDS);
}
public void setGroupManagementAgent(GroupManagementAgent groupManagementAgent) {
@@ -157,54 +174,93 @@ public class MembershipManager {
}
if (shouldAddMember) {
+ boolean wkaMemberBelongsToLocalDomain = true;
if (rpcMembershipChannel != null && isLocalMemberInitialized() &&
wkaMembers.contains(member)) { // if it is a well-known member
log.info("A WKA member " + TribesUtil.getName(member) +
" just joined the group. Sending MEMBER_LIST message.");
- // send the member list to it
- MemberListCommand memListCmd;
- try {
- memListCmd = new MemberListCommand();
- List<Member> members = new ArrayList<Member>(this.members);
- members.add(localMember); // Need to set the local member too
- memListCmd.setMembers(members.toArray(new Member[members.size()]));
-
- Response[] responses =
- rpcMembershipChannel.send(new Member[]{member}, memListCmd,
- RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS |
- TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
-
- // Once a response is received from the WKA member to the MEMBER_LIST message,
- // if it does not belong to this domain, simply remove it from the members
- if (responses != null && responses.length > 0 && responses[0] != null) {
- Member source = responses[0].getSource();
- if (!TribesUtil.areInSameDomain(source, member)) {
- if (log.isDebugEnabled()) {
- log.debug("WKA Member " + TribesUtil.getName(source) +
- " does not belong to local domain " + new String(domain) +
- ". Hence removing it from the list.");
- }
- members.remove(member);
- return false;
+ wkaMemberBelongsToLocalDomain = sendMemberListToWellKnownMember(member);
+ }
+ if (wkaMemberBelongsToLocalDomain) {
+ members.add(member);
+ if (log.isDebugEnabled()) {
+ log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
+ new String(member.getDomain()));
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Task which send MEMBER_LIST messages to WKA members which have not yet responded to the
+ * MEMBER_LIST message
+ */
+ private class MemberListSenderTask implements Runnable {
+ public void run() {
+ try {
+ if (nonRespondingWkaMembers != null && !nonRespondingWkaMembers.isEmpty()) {
+ for (Member wkaMember : nonRespondingWkaMembers) {
+ if (wkaMember != null) {
+ sendMemberListToWellKnownMember(wkaMember);
}
}
- } catch (Exception e) {
- String errMsg = "Could not send MEMBER_LIST to well-known member " +
- TribesUtil.getName(member);
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
}
+ } catch (Throwable e) {
+ log.error("Could not send MemberList to WKA Members", e);
}
- members.add(member);
- if (log.isDebugEnabled()) {
- log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
- new String(member.getDomain()));
+ }
+ }
+
+ /**
+ * Send MEMBER_LIST message to WKA member
+ *
+ * @param wkaMember The WKA member to whom the MEMBER_LIST has to be sent
+ * @return true - if the WKA member belongs to the domain of this local member
+ */
+ private boolean sendMemberListToWellKnownMember(Member wkaMember) {
+ /*if (wkaMember.isFailing() || wkaMember.isSuspect()) {
+ return false;
+ }*/
+ // send the member list to it
+ MemberListCommand memListCmd;
+ try {
+ memListCmd = new MemberListCommand();
+ List<Member> members = new ArrayList<Member>(this.members);
+ members.add(localMember); // Need to set the local member too
+ memListCmd.setMembers(members.toArray(new Member[members.size()]));
+
+ Response[] responses =
+ rpcMembershipChannel.send(new Member[]{wkaMember}, memListCmd,
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ TribesConstants.MEMBERSHIP_MSG_OPTION, 10000);
+
+ // Once a response is received from the WKA member to the MEMBER_LIST message,
+ // if it does not belong to this domain, simply remove it from the members
+ if (responses != null && responses.length > 0 && responses[0] != null) {
+ nonRespondingWkaMembers.remove(wkaMember);
+ Member source = responses[0].getSource();
+ if (!TribesUtil.areInSameDomain(source, wkaMember)) {
+ if (log.isDebugEnabled()) {
+ log.debug("WKA Member " + TribesUtil.getName(source) +
+ " does not belong to local domain " + new String(domain) +
+ ". Hence removing it from the list.");
+ }
+ return false;
+ }
+ } else { // No response from WKA member
+ nonRespondingWkaMembers.add(wkaMember);
}
- return true;
+ } catch (Exception e) {
+ String errMsg = "Could not send MEMBER_LIST to well-known member " +
+ TribesUtil.getName(wkaMember);
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
}
- return false;
+ return true;
}
/**
@@ -278,6 +334,7 @@ public class MembershipManager {
*/
public void memberDisappeared(Member member) {
members.remove(member);
+ nonRespondingWkaMembers.remove(member);
// Is this an application domain member?
if (groupManagementAgent != null) {
Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcMessagingHandler.java Thu Mar 1 11:28:58 2012
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringMessage;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+
+/**
+ * Handles RPC messages from members
+ */
+public class RpcMessagingHandler implements RpcCallback {
+
+ private static Log log = LogFactory.getLog(RpcMessagingHandler.class);
+
+ private ConfigurationContext configurationContext;
+
+ public RpcMessagingHandler(ConfigurationContext configurationContext) {
+ this.configurationContext = configurationContext;
+ }
+
+ public void setConfigurationContext(ConfigurationContext configurationContext) {
+ this.configurationContext = configurationContext;
+ }
+
+ public Serializable replyRequest(Serializable msg, Member invoker) {
+ if (log.isDebugEnabled()) {
+ log.debug("RPC request received by RpcMessagingHandler");
+ }
+ if (msg instanceof ClusteringMessage) {
+ ClusteringMessage clusteringMsg = (ClusteringMessage) msg;
+ try {
+ clusteringMsg.execute(configurationContext);
+ } catch (ClusteringFault e) {
+ String errMsg = "Cannot handle RPC message";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
+ }
+ return clusteringMsg.getResponse();
+ } else {
+ throw new IllegalArgumentException("Invalid RPC message of type " + msg.getClass() +
+ " received");
+ }
+ }
+
+ public void leftOver(Serializable msg, Member member) {
+ //TODO: Method implementation
+ }
+}
Added: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesAxisObserver.java Thu Mar 1 11:28:58 2012
@@ -0,0 +1,84 @@
+/*
+* Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.engine.AxisEvent;
+import org.apache.axis2.engine.AxisObserver;
+
+import java.util.ArrayList;
+
+/**
+ * AxisObserver which specifically handles setting of service & module classloaders for
+ * message deserialization by Tribes
+ */
+public class TribesAxisObserver implements AxisObserver {
+ public void init(AxisConfiguration axisConfiguration) {
+ //Nothing to do
+ }
+
+ public void serviceUpdate(AxisEvent axisEvent, AxisService axisService) {
+ //Nothing to do
+ }
+
+ public void serviceGroupUpdate(AxisEvent axisEvent, AxisServiceGroup axisServiceGroup) {
+ if (axisEvent.getEventType() == AxisEvent.SERVICE_DEPLOY) {
+ ClassLoaderUtil.addServiceGroupClassLoader(axisServiceGroup);
+ } else if (axisEvent.getEventType() == AxisEvent.SERVICE_REMOVE) {
+ ClassLoaderUtil.removeServiceGroupClassLoader(axisServiceGroup);
+ }
+ }
+
+ public void moduleUpdate(AxisEvent axisEvent, AxisModule axisModule) {
+ if (axisEvent.getEventType() == AxisEvent.MODULE_DEPLOY) {
+ ClassLoaderUtil.addModuleClassLoader(axisModule);
+ } else if (axisEvent.getEventType() == AxisEvent.MODULE_DEPLOY) {
+ ClassLoaderUtil.removeModuleClassLoader(axisModule);
+ }
+ }
+
+ public void addParameter(Parameter parameter) throws AxisFault {
+ //Nothing to do
+ }
+
+ public void removeParameter(Parameter parameter) throws AxisFault {
+ //Nothing to do
+ }
+
+ public void deserializeParameters(OMElement omElement) throws AxisFault {
+ //Nothing to do
+ }
+
+ public Parameter getParameter(String carbonHome) {
+ return null; //Nothing to do
+ }
+
+ public ArrayList<Parameter> getParameters() {
+ return null; //Nothing to do
+ }
+
+ public boolean isParameterLocked(String carbonHome) {
+ return false; //Nothing to do
+ }
+}
Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusteringAgent.java Thu Mar 1 11:28:58 2012
@@ -23,8 +23,10 @@ import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.ClusteringCommand;
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringMessage;
import org.apache.axis2.clustering.MembershipListener;
import org.apache.axis2.clustering.MembershipScheme;
import org.apache.axis2.clustering.RequestBlockingHandler;
@@ -48,9 +50,10 @@ import org.apache.axis2.engine.DispatchP
import org.apache.axis2.engine.Phase;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ErrorHandler;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.Response;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.transport.MultiPointSender;
@@ -83,7 +86,14 @@ public class TribesClusteringAgent imple
private final HashMap<String, Parameter> parameters;
private ManagedChannel channel;
+ /**
+ * RpcChannel used for cluster initialization interactions
+ */
private RpcChannel rpcInitChannel;
+ /**
+ * RpcChannel used for RPC messaging interactions
+ */
+ private RpcChannel rpcMessagingChannel;
private ConfigurationContext configurationContext;
private Axis2ChannelListener axis2ChannelListener;
private ChannelSender channelSender;
@@ -104,6 +114,7 @@ public class TribesClusteringAgent imple
private final Map<String, GroupManagementAgent> groupManagementAgents =
new HashMap<String, GroupManagementAgent>();
private boolean clusterManagementMode;
+ private RpcMessagingHandler rpcMessagingHandler;
public TribesClusteringAgent() {
parameters = new HashMap<String, Parameter>();
@@ -151,7 +162,8 @@ public class TribesClusteringAgent imple
addRequestBlockingHandlerToInFlows();
primaryMembershipManager = new MembershipManager(configurationContext);
- channel = new GroupChannel();
+ channel = new Axis2GroupChannel();
+ channel.setHeartbeat(true);
channelSender = new ChannelSender(channel, primaryMembershipManager, synchronizeAllMembers());
axis2ChannelListener =
new Axis2ChannelListener(configurationContext, configurationManager, contextManager);
@@ -165,10 +177,19 @@ public class TribesClusteringAgent imple
// picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
rpcInitRequestHandler = new RpcInitializationRequestHandler(configurationContext);
rpcInitChannel =
- new RpcChannel(TribesUtil.getRpcInitChannelId(domain),
- channel, rpcInitRequestHandler);
+ new RpcChannel(TribesUtil.getRpcInitChannelId(domain), channel,
+ rpcInitRequestHandler);
if (log.isDebugEnabled()) {
- log.debug("Created RPC Channel for domain " + new String(domain));
+ log.debug("Created RPC Init Channel for domain " + new String(domain));
+ }
+
+ // Initialize RpcChannel used for messaging
+ rpcMessagingHandler = new RpcMessagingHandler(configurationContext);
+ rpcMessagingChannel =
+ new RpcChannel(TribesUtil.getRpcMessagingChannelId(domain), channel,
+ rpcMessagingHandler);
+ if (log.isDebugEnabled()) {
+ log.debug("Created RPC Messaging Channel for domain " + new String(domain));
}
setMaximumRetries();
@@ -182,10 +203,8 @@ public class TribesClusteringAgent imple
channel.start(Channel.DEFAULT); // At this point, this member joins the group
String localHost = TribesUtil.getLocalHost(channel);
if (localHost.startsWith("127.0.")) {
- channel.stop(Channel.DEFAULT);
- throw new ClusteringFault("Cannot join cluster using IP " + localHost +
- ". Please set an IP address other than " +
- localHost + " in the axis2.xml file");
+ log.warn("Local member advertising its IP address as 127.0.0.1. " +
+ "Remote members will not be able to connect to this member.");
}
} catch (ChannelException e) {
String msg = "Error starting Tribes channel";
@@ -198,6 +217,9 @@ public class TribesClusteringAgent imple
membershipScheme.joinGroup();
+ configurationContext.getAxisConfiguration().addObservers(new TribesAxisObserver());
+ ClassLoaderUtil.init(configurationContext.getAxisConfiguration());
+
// If configuration management is enabled, get the latest config from a neighbour
if (configurationManager != null) {
configurationManager.setSender(channelSender);
@@ -212,7 +234,6 @@ public class TribesClusteringAgent imple
ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
configurationContext.addContextListener(contextListener);
}
-
configurationContext.
setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
log.info("Cluster initialization completed.");
@@ -230,6 +251,48 @@ public class TribesClusteringAgent imple
}
}
+ public List<ClusteringCommand> sendMessage(ClusteringMessage message,
+ boolean isRpcMessage) throws ClusteringFault {
+ List<ClusteringCommand> responseList = new ArrayList<ClusteringCommand>();
+ Member[] members = primaryMembershipManager.getMembers();
+ if (members.length == 0) {
+ return responseList;
+ }
+ if (isRpcMessage) {
+ try {
+ Response[] responses = rpcMessagingChannel.send(members, message, RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_SYNCHRONIZED_ACK,
+ 10000);
+ for (Response response : responses) {
+ responseList.add((ClusteringCommand)response.getMessage());
+ }
+ } catch (ChannelException e) {
+ String msg = "Error occurred while sending RPC message to cluster.";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
+ }
+ } else {
+ try {
+ channel.send(members, message, 10000, new ErrorHandler(){
+ public void handleError(ChannelException e, UniqueId uniqueId) {
+ log.error("Sending failed " + uniqueId, e );
+ }
+
+ public void handleCompletion(UniqueId uniqueId) {
+ if(log.isDebugEnabled()){
+ log.debug("Sending successful " + uniqueId);
+ }
+ }
+ });
+ } catch (ChannelException e) {
+ String msg = "Error occurred while sending message to cluster.";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
+ }
+ }
+ return responseList;
+ }
+
private void setMemberInfo() throws ClusteringFault {
Properties memberInfo = new Properties();
AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
@@ -263,11 +326,14 @@ public class TribesClusteringAgent imple
OMElement propEle = (OMElement) iter.next();
OMAttribute nameAttrib = propEle.getAttribute(new QName("name"));
if(nameAttrib != null){
+ String attribName = nameAttrib.getAttributeValue();
+ attribName = replaceProperty(attribName, memberInfo);
+
OMAttribute valueAttrib = propEle.getAttribute(new QName("value"));
if (valueAttrib != null) {
String attribVal = valueAttrib.getAttributeValue();
attribVal = replaceProperty(attribVal, memberInfo);
- memberInfo.setProperty(nameAttrib.getAttributeValue(), attribVal);
+ memberInfo.setProperty(attribName, attribVal);
}
}
}
@@ -295,10 +361,13 @@ public class TribesClusteringAgent imple
// and are assumed to be System properties
while (indexOfStartingChars < text.indexOf("${") &&
(indexOfStartingChars = text.indexOf("${")) != -1 &&
- (indexOfClosingBrace = text.indexOf("}")) != -1) { // Is a property used?
+ (indexOfClosingBrace = text.indexOf("}")) != -1) { // Is a property used?
String sysProp = text.substring(indexOfStartingChars + 2,
indexOfClosingBrace);
String propValue = props.getProperty(sysProp);
+ if (propValue == null) {
+ propValue = System.getProperty(sysProp);
+ }
if (propValue != null) {
text = text.substring(0, indexOfStartingChars) + propValue +
text.substring(indexOfClosingBrace + 1);
@@ -595,7 +664,7 @@ public class TribesClusteringAgent imple
* Get some information from a neighbour. This information will be used by this node to
* initialize itself
* <p/>
- * rpcChannel is The utility for sending RPC style messages to the channel
+ * rpcInitChannel is The utility for sending RPC style messages to the channel
*
* @param command The control command to send
* @throws ClusteringFault If initialization code failed on this node
@@ -619,7 +688,7 @@ public class TribesClusteringAgent imple
primaryMembershipManager.getLongestLivingMember() : // First try to get from the longest member alive
primaryMembershipManager.getRandomMember(); // Else get from a random member
String memberHost = TribesUtil.getName(member);
- log.info("Trying to send intialization request to " + memberHost);
+ log.info("Trying to send initialization request to " + memberHost);
try {
if (!sentMembersList.contains(memberHost)) {
Response[] responses;
@@ -627,7 +696,8 @@ public class TribesClusteringAgent imple
responses = rpcInitChannel.send(new Member[]{member},
command,
RpcChannel.FIRST_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ Channel.SEND_OPTIONS_BYTE_MESSAGE,
10000);
if (responses.length == 0) {
try {
@@ -710,6 +780,7 @@ public class TribesClusteringAgent imple
if (channel != null) {
try {
channel.removeChannelListener(rpcInitChannel);
+ channel.removeChannelListener(rpcMessagingChannel);
channel.removeChannelListener(axis2ChannelListener);
channel.stop(Channel.DEFAULT);
} catch (ChannelException e) {
@@ -729,6 +800,9 @@ public class TribesClusteringAgent imple
if (rpcInitRequestHandler != null) {
rpcInitRequestHandler.setConfigurationContext(configurationContext);
}
+ if (rpcMessagingHandler!= null) {
+ rpcMessagingHandler.setConfigurationContext(configurationContext);
+ }
if (axis2ChannelListener != null) {
axis2ChannelListener.setConfigurationContext(configurationContext);
}
Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java Thu Mar 1 11:28:58 2012
@@ -26,6 +26,11 @@ public final class TribesConstants {
public static final String RPC_INIT_CHANNEL = "rpc.init.channel";
/**
+ * The ID of the RPC messaging channel
+ */
+ public static final String RPC_MESSAGING_CHANNEL = "rpc.msg.channel";
+
+ /**
* The ID of the RPC membership message channel. This channel is only used when WKA
* membership discovery mechanism is used
*/
Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Thu Mar 1 11:28:58 2012
@@ -101,6 +101,10 @@ public class TribesUtil {
return (new String(domain) + ":" + TribesConstants.RPC_INIT_CHANNEL).getBytes();
}
+ public static byte[] getRpcMessagingChannelId(byte[] domain) {
+ return (new String(domain) + ":" + TribesConstants.RPC_MESSAGING_CHANNEL).getBytes();
+ }
+
public static boolean isInDomain(Member member, byte[] domain) {
return Arrays.equals(domain, member.getDomain());
}
Modified: axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Thu Mar 1 11:28:58 2012
@@ -1,17 +1,17 @@
-/*
- * Copyright 2004,2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.axis2.clustering.tribes;
@@ -153,13 +153,13 @@ public class WkaBasedMembershipScheme im
try {
if (localPort != null) {
port = Integer.parseInt(((String) localPort.getValue()).trim());
- port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
+ port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 1000);
} else { // In cases where the localport needs to be automatically figured out
- port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
+ port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 1000);
}
} catch (IOException e) {
String msg =
- "Could not allocate the specified port or a port in the range 4000-4100 " +
+ "Could not allocate the specified port or a port in the range 4000-5000 " +
"for local host " + localMember.getHostname() +
". Check whether the IP address specified or inferred for the local " +
"member is correct.";
@@ -224,7 +224,7 @@ public class WkaBasedMembershipScheme im
return true;
} catch (IOException e) {
String msg = e.getMessage();
- if (msg.indexOf("Connection refused") == -1 && msg.indexOf("connect timed out") == -1) {
+ if (!msg.contains("Connection refused") && !msg.contains("connect timed out")) {
log.error("Cannot connect to member " +
member.getHostName() + ":" + member.getPort(), e);
}
@@ -258,7 +258,7 @@ public class WkaBasedMembershipScheme im
} catch (InterruptedException ignored) {
ignored.printStackTrace();
}
- getLocalPort(socket, hostname, portstart, retries, -1);
+ portstart = getLocalPort(socket, hostname, portstart, retries, -1);
}
}
return portstart;
@@ -289,7 +289,7 @@ public class WkaBasedMembershipScheme im
log.debug("Adding Interceptors...");
}
TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
- tcpPingInterceptor.setInterval(100);
+ tcpPingInterceptor.setInterval(10000);
channel.addInterceptor(tcpPingInterceptor);
if (log.isDebugEnabled()) {
log.debug("Added TCP Ping Interceptor");
@@ -299,7 +299,7 @@ public class WkaBasedMembershipScheme im
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
// tcpFailureDetector.setPrevious(dfi); //TODO: check this
tcpFailureDetector.setReadTestTimeout(120000);
- tcpFailureDetector.setConnectTimeout(60000);
+ tcpFailureDetector.setConnectTimeout(180000);
channel.addInterceptor(tcpFailureDetector);
if (log.isDebugEnabled()) {
log.debug("Added TCP Failure Detector");
@@ -310,7 +310,7 @@ public class WkaBasedMembershipScheme im
staticMembershipInterceptor = new StaticMembershipInterceptor();
staticMembershipInterceptor.setLocalMember(primaryMembershipManager.getLocalMember());
- primaryMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ primaryMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
channel.addInterceptor(staticMembershipInterceptor);
if (log.isDebugEnabled()) {
log.debug("Added Static Membership Interceptor");
@@ -350,7 +350,7 @@ public class WkaBasedMembershipScheme im
// Have multiple RPC channels with multiple RPC request handlers for each localDomain
// This is needed only when this member is running as a load balancer
for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
- appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ appDomainMembershipManager.setupStaticMembershipManagement(staticMembershipInterceptor);
// Create an RpcChannel for each localDomain
String domain = new String(appDomainMembershipManager.getDomain());
@@ -377,7 +377,6 @@ public class WkaBasedMembershipScheme im
// Send JOIN message to a WKA member
if (primaryMembershipManager.getMembers().length > 0) {
- log.info("Sending JOIN message to WKA members...");
org.apache.catalina.tribes.Member[] wkaMembers = primaryMembershipManager.getMembers(); // The well-known members
/*try {
Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
@@ -386,6 +385,7 @@ public class WkaBasedMembershipScheme im
Response[] responses = null;
do {
try {
+ log.info("Sending JOIN message to WKA members...");
responses = rpcMembershipChannel.send(wkaMembers,
new JoinGroupCommand(),
RpcChannel.ALL_REPLY,
@@ -394,10 +394,8 @@ public class WkaBasedMembershipScheme im
10000);
if (responses.length == 0) {
try {
- if (log.isDebugEnabled()) {
- log.debug("No responses received");
- }
- Thread.sleep(500);
+ log.info("No responses received from WKA members");
+ Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
}
Modified: axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java?rev=1295519&r1=1295518&r2=1295519&view=diff
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java (original)
+++ axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringAgent.java Thu Mar 1 11:28:58 2012
@@ -184,4 +184,15 @@ public interface ClusteringAgent extends
* @return the domains of this ClusteringAgent
*/
Set<String> getDomains();
+
+
+ /**
+ * Send a message to all members in this member's primary cluster
+ *
+ * @param msg The message to be sent
+ * @param isRpcMessage Indicates whether the message has to be sent in RPC mode
+ * @return A list of responses if the message is sent in RPC mode
+ * @throws ClusteringFault If an error occurs while sending the message
+ */
+ List<ClusteringCommand> sendMessage(ClusteringMessage msg, boolean isRpcMessage) throws ClusteringFault;
}
\ No newline at end of file
Added: axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java?rev=1295519&view=auto
==============================================================================
--- axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java (added)
+++ axis/axis2/java/core/branches/1_6/modules/kernel/src/org/apache/axis2/clustering/ClusteringMessage.java Thu Mar 1 11:28:58 2012
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.axis2.clustering;
+
+/**
+ * This is a special ClusteringCommand which is used for messaging. If there is a response,
+ * the response can be retrieved from this command
+ */
+public abstract class ClusteringMessage extends ClusteringCommand {
+
+ /**
+ * Get the response for this message
+ * @return the response for this message
+ */
+ public abstract ClusteringCommand getResponse();
+}