You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2007/06/18 08:59:40 UTC
svn commit: r548236 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/context/
clustering/src/org/apache/axis2/clustering/tribes/
kernel/src/org/apache/axis2/clustering/
kernel/src/org/apache/axis2/clustering/context/
Author: azeez
Date: Sun Jun 17 23:59:35 2007
New Revision: 548236
URL: http://svn.apache.org/viewvc?view=rev&rev=548236
Log:
Incorportaing a WAIT time to check whether an ACK has arrived, based on the last TTS (time to send). This is a response time improvement.
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Sun Jun 17 23:59:35 2007
@@ -116,6 +116,7 @@
if (!includeAllProperties) {
// Sometimes, there can be failures, so if an exception occurs, we retry
+ // TODO: Need to investigate these failures. Looks like this is becuase the contexts are not synchronized
while (true) {
Map diffs = context.getPropertyDifferences();
try {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Sun Jun 17 23:59:35 2007
@@ -19,6 +19,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
import org.apache.axis2.clustering.tribes.AckManager;
import org.apache.axis2.clustering.tribes.ChannelSender;
@@ -147,7 +148,9 @@
}
} while (sender == null);
try {
- sender.sendToGroup(cmd);
+ long tts = sender.sendToGroup(cmd);
+ configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
+ new Long(tts));
} catch (ClusteringFault clusteringFault) {
throw new RuntimeException(clusteringFault);
}
@@ -155,7 +158,9 @@
};
processorThread.start();
} else {
- sender.sendToGroup(cmd);
+ long tts = sender.sendToGroup(cmd);
+ configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
+ new Long(tts));
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Sun Jun 17 23:59:35 2007
@@ -68,11 +68,15 @@
log.debug("[NO ACK] from member " + memberHost);
log.debug("ACKed member list=" + memberList);
- // At this point, resend the original message back to the node which has not
- // sent an ACK
- sender.sendToMember(ack.getCommand(), member);
+ // If a new member joined the cluster recently,
+ // we need to retransmit the message to this member, if an ACK has not been
+ // received from this member.
+ if (member.getMemberAliveTime() < 1000) { // TODO: Check
+ sender.sendToMember(ack.getCommand(), member);
+ log.debug("Retransimitting msg " + ack.getCommand().getUniqueId() +
+ " to member " + memberHost);
+ }
- //TODO: Enhancement, Check whether this is a new member. If then send the msg
isAcknowledged = false;
break;
} else {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Sun Jun 17 23:59:35 2007
@@ -17,6 +17,7 @@
package org.apache.axis2.clustering.tribes;
import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
@@ -41,7 +42,7 @@
private DefaultContextManager contextManager;
private DefaultConfigurationManager configurationManager;
private TribesControlCommandProcessor controlCommandProcessor;
- private ChannelSender sender;
+ private ChannelSender channelSender;
/**
* The messages received are enqued. Another thread, messageProcessor, will
@@ -64,9 +65,9 @@
this.configurationManager = configurationManager;
this.contextManager = contextManager;
this.controlCommandProcessor = controlCommandProcessor;
- this.sender = sender;
+ this.channelSender = sender;
this.configurationContext = configurationContext;
- startMessageProcessor();
+// startMessageProcessor();
}
public void setContextManager(DefaultContextManager contextManager) {
@@ -98,22 +99,28 @@
// Need to process ACKs as soon as they are received since otherwise,
// unnecessary retransmissions will take place
- if(msg instanceof AckCommand){
+ /*if (msg instanceof AckCommand) {
try {
controlCommandProcessor.process((AckCommand) msg, sender);
} catch (Exception e) {
log.error(e);
}
return;
+ }*/
+
+ try {
+ processMessage(msg,sender);
+ } catch (Exception e) {
+ log.error(e);
}
// Add the commands to be precessed to the cmdQueue
- synchronized (cmdQueue) {
+ /*synchronized (cmdQueue) {
cmdQueue.enqueue(new MemberMessage(msg, sender));
}
if (!messageProcessor.isAlive()) {
startMessageProcessor();
- }
+ }*/
}
private void startMessageProcessor() {
@@ -123,6 +130,32 @@
messageProcessor.start();
}
+
+ private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
+ long start = System.currentTimeMillis();
+ if (msg instanceof ContextClusteringCommand && contextManager != null) {
+ ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
+ contextManager.process(ctxCmd);
+
+ // Sending ACKs for ContextClusteringCommandCollection or
+ // UpdateContextCommand is sufficient
+ if (msg instanceof ContextClusteringCommandCollection ||
+ msg instanceof UpdateContextCommand) {
+ AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
+
+ // Send the ACK
+ this.channelSender.sendToMember(ackCmd, sender);
+ }
+ } else if (msg instanceof ConfigurationClusteringCommand &&
+ configurationManager != null) {
+ configurationManager.process((ConfigurationClusteringCommand) msg);
+ } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
+ controlCommandProcessor.process((ControlCommand) msg,
+ sender);
+ }
+ System.err.println("######### Time to process=" + (System.currentTimeMillis() - start));
+ }
+
/**
* A container to hold a message and its sender
*/
@@ -158,28 +191,7 @@
Thread.sleep(1);
continue;
}
-
- Serializable msg = memberMessage.getMessage();
- if (msg instanceof ContextClusteringCommand && contextManager != null) {
- ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
- contextManager.process(ctxCmd);
-
- // Sending ACKs for ContextClusteringCommandCollection or
- // UpdateContextCommand is sufficient
- if (msg instanceof ContextClusteringCommandCollection ||
- msg instanceof UpdateContextCommand) {
- AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
-
- // Send the ACK
- sender.sendToMember(ackCmd, memberMessage.getSender());
- }
- } else if (msg instanceof ConfigurationClusteringCommand &&
- configurationManager != null) {
- configurationManager.process((ConfigurationClusteringCommand) msg);
- } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
- controlCommandProcessor.process((ControlCommand) msg,
- memberMessage.getSender());
- }
+ processMessage(memberMessage.getMessage(), memberMessage.getSender());
} catch (Throwable e) {
log.error("Could not process message ", e);
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Sun Jun 17 23:59:35 2007
@@ -28,17 +28,20 @@
private Log log = LogFactory.getLog(ChannelSender.class);
private Channel channel;
- public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
+ public long sendToGroup(ClusteringCommand msg) throws ClusteringFault {
if (channel == null) {
- return;
+ return 0;
}
+ long timeToSend = 0;
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
// causing a view change. All nodes in a view should get the msg
while (true) {
if (channel.getMembers().length > 0) {
try {
- channel.send(channel.getMembers(), msg, 0);
+ long start = System.currentTimeMillis();
+ channel.send(channel.getMembers(), msg, Channel.SEND_OPTIONS_USE_ACK);
+ timeToSend = System.currentTimeMillis() - start;
log.debug("Sent " + msg + " to group");
break;
} catch (ChannelException e) {
@@ -50,6 +53,7 @@
break;
}
}
+ return timeToSend;
}
public void sendToSelf(ClusteringCommand msg) throws ClusteringFault {
@@ -59,23 +63,27 @@
try {
channel.send(new Member[]{channel.getLocalMember(true)},
msg,
- 0);
+ Channel.SEND_OPTIONS_USE_ACK);
log.debug("Sent " + msg + " to self");
} catch (ChannelException e) {
throw new ClusteringFault(e);
}
}
- public void sendToGroup(Throwable throwable) throws ClusteringFault {
+ public long sendToGroup(Throwable throwable) throws ClusteringFault {
if (channel == null) {
- return;
+ return 0;
}
+ long timeToSend = 0;
+
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
while (true) {
if (channel.getMembers().length > 0) {
try {
- channel.send(channel.getMembers(), throwable, 0);
+ long start = System.currentTimeMillis();
+ channel.send(channel.getMembers(), throwable, Channel.SEND_OPTIONS_USE_ACK);
+ timeToSend = System.currentTimeMillis() - start;
log.debug("Sent " + throwable + " to group");
} catch (ChannelException e) {
String message = "Error sending exception message : " + throwable +
@@ -86,12 +94,16 @@
break;
}
}
+ return timeToSend;
}
- public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
+ public long sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
+ long timeToSend = 0;
try {
if (member.isReady()) {
- channel.send(new Member[]{member}, cmd, 0);
+ long start = System.currentTimeMillis();
+ channel.send(new Member[]{member}, cmd, Channel.SEND_OPTIONS_USE_ACK);
+ timeToSend = System.currentTimeMillis() - start;
log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
}
} catch (ChannelException e) {
@@ -99,6 +111,7 @@
". Reason " + e.getMessage();
log.warn(message);
}
+ return timeToSend;
}
public Channel getChannel() {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sun Jun 17 23:59:35 2007
@@ -152,10 +152,13 @@
int memberIndex = random.nextInt(members.length);
Member member = members[memberIndex];
if (!sentMembersList.contains(TribesUtil.getHost(member))) {
- sender.sendToMember(new GetStateCommand(), member);
+ long tts = sender.sendToMember(new GetStateCommand(), member);
+ configurationContext.
+ setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
+ new Long(tts));
sentMembersList.add(TribesUtil.getHost(member));
log.debug("WAITING FOR STATE UPDATE...");
- Thread.sleep(1000);
+ Thread.sleep(tts + 5);
}
} catch (Exception e) {
e.printStackTrace();
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Sun Jun 17 23:59:35 2007
@@ -25,4 +25,5 @@
public static final String AVOID_INITIATION_KEY = "AvoidInitiation";
public static final String DOMAIN = "domain";
public static final String CLUSTER_INITIALIZED = "local_cluster.initialized";
+ public static final String TIME_TO_SEND = "local_cluster.time.to.send";
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java Sun Jun 17 23:59:35 2007
@@ -20,9 +20,9 @@
*/
public interface MessageSender {
- public void sendToGroup(ClusteringCommand msg) throws ClusteringFault;
+ public long sendToGroup(ClusteringCommand msg) throws ClusteringFault;
public void sendToSelf(ClusteringCommand msg) throws ClusteringFault;
- public void sendToGroup(Throwable throwable) throws ClusteringFault;
+ public long sendToGroup(Throwable throwable) throws ClusteringFault;
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java?view=diff&rev=548236&r1=548235&r2=548236
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Sun Jun 17 23:59:35 2007
@@ -18,6 +18,7 @@
import org.apache.axis2.clustering.ClusterManager;
import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -88,7 +89,7 @@
}
if (!abstractContext.getPropertyDifferences().isEmpty()) {
String msgUUID = contextManager.updateContext(abstractContext);
- waitForACKs(contextManager, msgUUID);
+ waitForACKs(contextManager, msgUUID, abstractContext.getRootContext());
}
} else {
String msg = "Cannot replicate contexts since " +
@@ -135,7 +136,7 @@
String msgUUID =
contextManager.updateContexts((AbstractContext[]) contexts.
toArray(new AbstractContext[contexts.size()]));
- waitForACKs(contextManager, msgUUID);
+ waitForACKs(contextManager, msgUUID, msgContext.getRootContext());
}
} else {
@@ -146,7 +147,8 @@
}
private static void waitForACKs(ContextManager contextManager,
- String msgUUID) throws ClusteringFault {
+ String msgUUID,
+ ConfigurationContext configCtx) throws ClusteringFault {
long start = System.currentTimeMillis();
// Wait till all members have ACKed receipt & successful processing of
@@ -155,11 +157,17 @@
// Wait sometime before checking whether message is ACKed
try {
- Thread.sleep(50);
+ Long tts =
+ (Long) configCtx.getPropertyNonReplicable(ClusteringConstants.TIME_TO_SEND);
+ if (tts == null) {
+ Thread.sleep(40);
+ } else if (tts.longValue() >= 0) {
+ Thread.sleep(tts.longValue() + 5); // Time to recv ACK + time in queue & processing replication request
+ }
} catch (InterruptedException ignored) {
}
- if (System.currentTimeMillis() - start > 40000) {
- throw new ClusteringFault("ACKs not received from all members within 40 sec. " +
+ if (System.currentTimeMillis() - start > 15000) {
+ throw new ClusteringFault("ACKs not received from all members within 15 sec. " +
"Aborting wait.");
}
} while (!contextManager.isMessageAcknowledged(msgUUID));
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org