You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2007/05/30 13:10:30 UTC
svn commit: r542797 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/context/
clustering/src/org/apache/axis2/clustering/control/
clustering/src/org/apache/axis2/clustering/handlers/
clustering/src/org/apache/axis2...
Author: azeez
Date: Wed May 30 04:10:28 2007
New Revision: 542797
URL: http://svn.apache.org/viewvc?view=rev&rev=542797
Log:
Introducing a ACKing mechanism. The response is sent to the client only if the state change is successfully replicated across the cluster.
Each member has to send an ACK for a particular message received.
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Wed May 30 04:10:28 2007
@@ -15,14 +15,16 @@
*/
package org.apache.axis2.clustering.context;
+import org.apache.axiom.om.util.UUIDGenerator;
import org.apache.axis2.clustering.context.commands.*;
+import org.apache.axis2.clustering.tribes.AckManager;
import org.apache.axis2.context.*;
import org.apache.axis2.deployment.DeploymentConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.axiom.om.util.UUIDGenerator;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -34,6 +36,26 @@
private static final Log log = LogFactory.getLog(ContextClusteringCommandFactory.class);
+ public static ContextClusteringCommandCollection
+ getCommandCollection(AbstractContext[] contexts,
+ Map excludedReplicationPatterns) {
+
+ ArrayList commands = new ArrayList(contexts.length);
+ ContextClusteringCommandCollection collection =
+ new ContextClusteringCommandCollection(commands);
+ for (int i = 0; i < contexts.length; i++) {
+ ContextClusteringCommand cmd = getUpdateCommand(contexts[i],
+ excludedReplicationPatterns,
+ false);
+ if (cmd != null) {
+ commands.add(cmd);
+ }
+ }
+ collection.setUniqueId(UUIDGenerator.getUUID());
+ AckManager.addInitialAcknowledgement(collection);
+ return collection;
+ }
+
/**
* @param context
* @param excludedPropertyPatterns
@@ -82,6 +104,8 @@
}
if (cmd != null && ((UpdateContextCommand) cmd).isPropertiesEmpty()) {
cmd = null;
+ } else {
+ AckManager.addInitialAcknowledgement(cmd);
}
context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
return cmd;
@@ -175,7 +199,7 @@
if (abstractContext instanceof ServiceGroupContext) {
ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
ServiceGroupContextCommand cmd = new CreateServiceGroupContextCommand();
- //TODO impl
+ cmd.setUniqueId(UUIDGenerator.getUUID());
cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName());
cmd.setServiceGroupContextId(sgCtx.getId());
return cmd;
@@ -183,6 +207,7 @@
ServiceContext serviceCtx = (ServiceContext) abstractContext;
ServiceContextCommand cmd = new CreateServiceContextCommand();
ServiceGroupContext sgCtx = (ServiceGroupContext) serviceCtx.getParent();
+ cmd.setUniqueId(UUIDGenerator.getUUID());
cmd.setServiceGroupContextId(sgCtx.getId());
cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName());
cmd.setServiceName(serviceCtx.getAxisService().getName());
@@ -196,6 +221,7 @@
ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
ServiceGroupContextCommand cmd = new DeleteServiceGroupContextCommand();
// TODO: impl
+ cmd.setUniqueId(UUIDGenerator.getUUID());
cmd.setServiceGroupName(sgCtx.getDescription().getServiceGroupName());
cmd.setServiceGroupContextId(sgCtx.getId());
return cmd;
@@ -203,6 +229,7 @@
ServiceContext serviceCtx = (ServiceContext) abstractContext;
ServiceContextCommand cmd = new DeleteServiceContextCommand();
// TODO: impl
+ cmd.setUniqueId(UUIDGenerator.getUUID());
cmd.setServiceGroupName(serviceCtx.getGroupName());
cmd.setServiceName(serviceCtx.getAxisService().getName());
return cmd;
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Wed May 30 04:10:28 2007
@@ -20,6 +20,8 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.MessageSender;
+import org.apache.axis2.clustering.tribes.AckManager;
+import org.apache.axis2.clustering.tribes.ChannelSender;
import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
@@ -35,56 +37,59 @@
private Map parameters = new HashMap();
- private MessageSender sender;
+ private ChannelSender sender;
private ContextReplicationProcessor processor = new ContextReplicationProcessor();
private Map excludedReplicationPatterns = new HashMap();
- public void setSender(MessageSender sender) {
+ //TODO: Try how to use an interface
+ public void setSender(ChannelSender sender) {
this.sender = sender;
}
public DefaultContextManager() {
}
- public void addContext(final AbstractContext context) throws ClusteringFault {
- processor.process(ContextClusteringCommandFactory.getCreateCommand(context));
+ public String addContext(final AbstractContext context) throws ClusteringFault {
+ ContextClusteringCommand cmd = ContextClusteringCommandFactory.getCreateCommand(context);
+ processor.process(cmd);
+ return cmd.getUniqueId();
}
- public void removeContext(AbstractContext context) throws ClusteringFault {
- processor.process(ContextClusteringCommandFactory.getRemoveCommand(context));
+ public String removeContext(AbstractContext context) throws ClusteringFault {
+ ContextClusteringCommand cmd = ContextClusteringCommandFactory.getRemoveCommand(context);
+ processor.process(cmd);
+ return cmd.getUniqueId();
}
- public void updateContext(AbstractContext context) throws ClusteringFault {
- ContextClusteringCommand message =
+ public String updateContext(AbstractContext context) throws ClusteringFault {
+ ContextClusteringCommand cmd =
ContextClusteringCommandFactory.getUpdateCommand(context,
excludedReplicationPatterns,
false);
- if (message != null) {
- processor.process(message);
+ if (cmd != null) {
+ processor.process(cmd);
+ return cmd.getUniqueId();
}
+ return null;
}
- public void updateContexts(AbstractContext[] contexts) throws ClusteringFault {
- ArrayList commands = new ArrayList(contexts.length);
- ContextClusteringCommandCollection collection =
- new ContextClusteringCommandCollection(commands);
- for (int i = 0; i < contexts.length; i++) {
- ContextClusteringCommand cmd =
- ContextClusteringCommandFactory.getUpdateCommand(contexts[i],
- excludedReplicationPatterns,
- false);
- if (cmd != null) {
- commands.add(cmd);
- }
- }
- processor.process(collection);
+ public String updateContexts(AbstractContext[] contexts) throws ClusteringFault {
+ ContextClusteringCommandCollection cmd =
+ ContextClusteringCommandFactory.getCommandCollection(contexts,
+ excludedReplicationPatterns);
+ processor.process(cmd);
+ return cmd.getUniqueId();
}
public boolean isContextClusterable(AbstractContext context) {
return (context instanceof ConfigurationContext) ||
(context instanceof ServiceContext) ||
(context instanceof ServiceGroupContext);
+ }
+
+ public boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault {
+ return AckManager.isMessageAcknowledged(messageUniqueId, sender);
}
public void process(ContextClusteringCommand command) throws ClusteringFault {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java Wed May 30 04:10:28 2007
@@ -15,13 +15,16 @@
*/
package org.apache.axis2.clustering.control;
-import org.apache.axis2.clustering.ClusteringCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.tribes.AckManager;
+import org.apache.axis2.context.ConfigurationContext;
/**
- * ACK for the message with id <code>uniqueId</code>
+ * ACK for the message with id <code>uniqueId</code>
*/
-public class AckCommand extends ClusteringCommand {
+public class AckCommand extends ControlCommand {
private String uniqueId;
+ private String memberId;
public AckCommand(String messageUniqueId) {
this.uniqueId = messageUniqueId;
@@ -31,7 +34,19 @@
return uniqueId;
}
+ public void setMemberId(String memberId) {
+ this.memberId = memberId;
+ }
+
public int getCommandType() {
return Integer.MAX_VALUE;
+ }
+
+ public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ AckManager.addAcknowledgement(uniqueId, memberId);
+ }
+
+ public String toString() {
+ return "ACK for message with UUID " + uniqueId;
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java Wed May 30 04:10:28 2007
@@ -59,15 +59,17 @@
if (updateCmd != null) {
cmdList.add(updateCmd);
}
- for (Iterator iter2 = sgCtx.getServiceContexts(); iter2.hasNext();) {
- ServiceContext serviceCtx = (ServiceContext) iter2.next();
- cmdList.add(ContextClusteringCommandFactory.getCreateCommand(serviceCtx));
- ContextClusteringCommand updateServiceCtxCmd =
- ContextClusteringCommandFactory.getUpdateCommand(serviceCtx,
- excludedPropPatterns,
- true);
- if (updateServiceCtxCmd != null) {
- cmdList.add(updateServiceCtxCmd);
+ if (sgCtx.getServiceContexts() != null) {
+ for (Iterator iter2 = sgCtx.getServiceContexts(); iter2.hasNext();) {
+ ServiceContext serviceCtx = (ServiceContext) iter2.next();
+ cmdList.add(ContextClusteringCommandFactory.getCreateCommand(serviceCtx));
+ ContextClusteringCommand updateServiceCtxCmd =
+ ContextClusteringCommandFactory.getUpdateCommand(serviceCtx,
+ excludedPropPatterns,
+ true);
+ if (updateServiceCtxCmd != null) {
+ cmdList.add(updateServiceCtxCmd);
+ }
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Wed May 30 04:10:28 2007
@@ -35,7 +35,13 @@
public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
log.debug("Going to replicate state on invoke");
- replicateState(msgContext);
+ try {
+ replicateState(msgContext);
+ } catch (Exception e) {
+ System.err.println("###########################");
+ e.printStackTrace();
+ System.err.println("###########################");
+ }
return InvocationResponse.CONTINUE;
}
@@ -85,10 +91,28 @@
// Do the actual replication here
if (!contexts.isEmpty()) {
- contextManager.
+ String msgUUID = contextManager.
updateContexts((AbstractContext[]) contexts.
toArray(new AbstractContext[contexts.size()]));
+
+ long start = System.currentTimeMillis();
+
+ // Wait till all members have ACKed receipt & successful processing of
+ // the message with UUID 'msgUUID'
+ while (!contextManager.isMessageAcknowledged(msgUUID)) {
+ if(System.currentTimeMillis() - start > 20000){
+ throw new ClusteringFault("ACKs not received from all members within 1 min. " +
+ "Aborting wait.");
+ }
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException e) {
+ log.error(e);
+ break;
+ }
+ }
}
+
} else {
String msg = "Cannot replicate contexts since " +
"ClusterManager is not specified in the axis2.xml file.";
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=auto&rev=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Wed May 30 04:10:28 2007
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.context.ContextClusteringCommand;
+import org.apache.catalina.tribes.Member;
+
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+/**
+ *
+ */
+public final class AckManager {
+
+ private static Map messageAckTable = new Hashtable();
+
+ public static void addInitialAcknowledgement(ContextClusteringCommand command) {
+ messageAckTable.put(command.getUniqueId(), new MessageACK(command));
+ }
+
+ public static void addAcknowledgement(String messageUniqueId,
+ String memberId) {
+ MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId);
+ if (ack != null) {
+ List memberList = ack.getMemberList();
+ memberList.add(memberId);
+ }
+ }
+
+ public static boolean isMessageAcknowledged(String messageUniqueId,
+ ChannelSender sender) throws ClusteringFault {
+ boolean isAcknowledged = false;
+ MessageACK ack = (MessageACK) messageAckTable.get(messageUniqueId);
+ List memberList = ack.getMemberList();
+
+ // Check that all members in the memberList are same as the total member list,
+ // which will indicate that all members have ACKed the message
+ Member[] members = sender.getChannel().getMembers();
+ for (int i = 0; i < members.length; i++) {
+ Member member = members[i];
+ if (!memberList.contains(member.getName())) {
+
+ // At this point, resend the original message back to the node which has not
+ // sent an ACK
+ sender.sendToMember(ack.getCommand(), member);
+ isAcknowledged = false;
+ break;
+ } else {
+ isAcknowledged = true;
+ }
+ }
+
+ // If a message is ACKed, we don't have to keep track of it in our ackTbl anymore
+ if (isAcknowledged) {
+ messageAckTable.remove(messageUniqueId);
+ }
+ return isAcknowledged;
+ }
+
+ private static class MessageACK {
+ private ContextClusteringCommand command;
+ private List memberList = new Vector();
+
+ public MessageACK(ContextClusteringCommand command) {
+ this.command = command;
+ }
+
+ public ContextClusteringCommand getCommand() {
+ return command;
+ }
+
+ public List getMemberList() {
+ return memberList;
+ }
+ }
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Wed May 30 04:10:28 2007
@@ -16,12 +16,17 @@
package org.apache.axis2.clustering.tribes;
+import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
import org.apache.axis2.clustering.context.DefaultContextManager;
+import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
+import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
import org.apache.axis2.clustering.control.AckCommand;
import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +54,10 @@
*/
private Thread messageProcessor;
- public ChannelListener(DefaultConfigurationManager configurationManager,
+ private ConfigurationContext configurationContext;
+
+ public ChannelListener(ConfigurationContext configurationContext,
+ DefaultConfigurationManager configurationManager,
DefaultContextManager contextManager,
TribesControlCommandProcessor controlCommandProcessor,
ChannelSender sender) {
@@ -57,6 +65,7 @@
this.contextManager = contextManager;
this.controlCommandProcessor = controlCommandProcessor;
this.sender = sender;
+ this.configurationContext = configurationContext;
startMessageProcessor();
}
@@ -68,12 +77,24 @@
this.configurationManager = configurationManager;
}
+ public void setConfigurationContext(ConfigurationContext configurationContext) {
+ this.configurationContext = configurationContext;
+ }
+
public boolean accept(Serializable msg, Member sender) {
return true;
}
public void messageReceived(Serializable msg, Member sender) {
- log.debug("Message received : " + msg);
+
+ // If the system has not still been intialized, reject all incoming messages, except the
+ // GetStateResponseCommand message
+ if (configurationContext.
+ getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
+ && !(msg instanceof GetStateResponseCommand)) {
+ return;
+ }
+ log.debug("RECEIVED MESSAGE " + msg);
synchronized (cmdQueue) {
cmdQueue.enqueue(new MemberMessage(msg, sender));
}
@@ -129,10 +150,16 @@
if (msg instanceof ContextClusteringCommand && contextManager != null) {
ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
contextManager.process(ctxCmd);
- AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
- // Send the ACK
- sender.sendToMember(ackCmd, memberMessage.getSender());
+ // Sending ACKs for ContextClusteringCommandCollection or
+ // UpdateContextCommand is sufficient
+ if (msg instanceof ContextClusteringCommandCollection ||
+ msg instanceof UpdateContextCommand) {
+ AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
+
+ // Send the ACK
+ sender.sendToMember(ackCmd, memberMessage.getSender());
+ }
} else if (msg instanceof ConfigurationClusteringCommand &&
configurationManager != null) {
configurationManager.process((ConfigurationClusteringCommand) msg);
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Wed May 30 04:10:28 2007
@@ -22,15 +22,11 @@
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
public class ChannelSender implements MessageSender {
private Channel channel;
- private static final Log log = LogFactory.getLog(ChannelSender.class);
-
public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
if(channel == null) return;
Member[] members = channel.getMembers();
@@ -82,14 +78,5 @@
public void setChannel(Channel channel) {
this.channel = channel;
- }
-
- private void printMember(Member member) {
- member.getUniqueId();
- log.debug("\n===============================");
- log.debug("Member Name " + member.getName());
- log.debug("Member Host" + member.getHost());
- log.debug("Member Payload" + member.getPayload());
- log.debug("===============================\n");
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 30 04:10:28 2007
@@ -52,6 +52,7 @@
private ManagedChannel channel;
private ConfigurationContext configurationContext;
private TribesControlCommandProcessor controlCmdProcessor;
+ private ChannelListener channelListener;
public TribesClusterManager() {
parameters = new HashMap();
@@ -69,10 +70,11 @@
public void init() throws ClusteringFault {
ChannelSender sender = new ChannelSender();
- ChannelListener listener = new ChannelListener(configurationManager,
- contextManager,
- controlCmdProcessor,
- sender);
+ channelListener = new ChannelListener(configurationContext,
+ configurationManager,
+ contextManager,
+ controlCmdProcessor,
+ sender);
if (configurationManager != null) {
configurationManager.setSender(sender);
@@ -114,7 +116,7 @@
// tcpFailureDetector.setPrevious(nbc);
// channel.addInterceptor(tcpFailureDetector);
- channel.addChannelListener(listener);
+ channel.addChannelListener(channelListener);
TribesMembershipListener membershipListener = new TribesMembershipListener();
channel.addMembershipListener(membershipListener);
channel.start(Channel.DEFAULT);
@@ -122,7 +124,7 @@
if (contextManager != null) {
contextManager.setSender(sender);
- listener.setContextManager(contextManager);
+ channelListener.setContextManager(contextManager);
Member[] members = channel.getMembers();
TribesUtil.printMembers(members);
@@ -220,5 +222,8 @@
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
controlCmdProcessor.setConfigurationContext(configurationContext);
+ if (channelListener != null) {
+ channelListener.setConfigurationContext(configurationContext);
+ }
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java Wed May 30 04:10:28 2007
@@ -17,6 +17,7 @@
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.AckCommand;
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.clustering.control.GetStateResponseCommand;
@@ -58,6 +59,10 @@
GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
getStateRespCmd.setCommands(((GetStateCommand) command).getCommands());
channelSender.sendToMember(getStateRespCmd, sender);
+ } else if (command instanceof AckCommand) {
+ AckCommand cmd = (AckCommand) command;
+ cmd.setMemberId(sender.getName());
+ cmd.execute(configurationContext);
} else {
command.execute(configurationContext);
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Wed May 30 04:10:28 2007
@@ -17,10 +17,8 @@
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.interceptors.NonBlockingCoordinator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.axis2.context.ConfigurationContext;
/**
*
@@ -28,32 +26,14 @@
public class TribesMembershipListener implements MembershipListener {
private static Log log = LogFactory.getLog(TribesMembershipListener.class);
-// private ConfigurationContext configContext;
public void memberAdded(Member member) {
- log.info("New member " + getHostSocket(member) + " added to Tribes group.");
- /* TODO: Send state information to this member.
- But if all of the members start sending these messages, there is
- it is going to be messy. Need to ensure that only one node send this message*/
-
-// System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
+ log.info("New member " + member.getName() + " joined cluster.");
+ // System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
}
public void memberDisappeared(Member member) {
- log.info("Member " + getHostSocket(member) + " left Tribes group");
+ log.info("Member " + member.getName() + " left cluster");
// System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
}
-
- private String getHostSocket(Member member) {
- String host = null;
- byte[] hostBytes = member.getHost();
- for (int i = 0; i < hostBytes.length; i++) {
- host = (host == null) ? ("" + hostBytes[i]) : (host + "." + hostBytes[i]);
- }
- return host + ":" + member.getPort();
- }/*
-
- public void setConfigContext(ConfigurationContext configContext) {
- this.configContext = configContext;
- }*/
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Wed May 30 04:10:28 2007
@@ -18,6 +18,7 @@
package org.apache.axis2.clustering;
public final class ClusteringConstants {
+
private ClusteringConstants() {
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java?view=diff&rev=542797&r1=542796&r2=542797
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java Wed May 30 04:10:28 2007
@@ -30,17 +30,19 @@
* This method is called when a new {@link AbstractContext} is added to the system
*
* @param context
+ * @return The UUID of the message that was sent to the group communications framework
* @throws ClusteringFault
*/
- void addContext(AbstractContext context) throws ClusteringFault;
+ String addContext(AbstractContext context) throws ClusteringFault;
/**
* This method is called when a new {@link AbstractContext} is removed from the system
*
* @param context
+ * @return The UUID of the message that was sent to the group communications framework
* @throws ClusteringFault
*/
- void removeContext(AbstractContext context) throws ClusteringFault;
+ String removeContext(AbstractContext context) throws ClusteringFault;
/**
* This method is called when properties in an {@link AbstractContext} are updated.
@@ -48,9 +50,10 @@
* removal of properties.
*
* @param context
+ * @return The UUID of the message that was sent to the group communications framework
* @throws ClusteringFault
*/
- void updateContext(AbstractContext context) throws ClusteringFault;
+ String updateContext(AbstractContext context) throws ClusteringFault;
/**
* This method is called when properties in a collection of {@link AbstractContext}s are updated.
@@ -58,17 +61,27 @@
* removal of properties.
*
* @param contexts
+ * @return The UUID of the message that was sent to the group communications framework
* @throws ClusteringFault
*/
- void updateContexts(AbstractContext[] contexts) throws ClusteringFault;
+ String updateContexts(AbstractContext[] contexts) throws ClusteringFault;
/**
- *
* @param context
* @return True - if the provided {@link AbstractContext} is clusterable
* @throws ClusteringFault
*/
boolean isContextClusterable(AbstractContext context) throws ClusteringFault;
+
+ /**
+ * Indicates whether a particular message has been ACKed by all members of a cluster
+ *
+ * @param messageUniqueId
+ * @return true - if all memebers have ACKed the message with ID <code>messageUniqueId</code>
+ * false - otherwise
+ * @throws ClusteringFault
+ */
+ boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault;
/**
* @param listener
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org