You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2007/06/06 16:09:42 UTC
svn commit: r544845 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/configuration/commands/
clustering/src/org/apache/axis2/clustering/context/commands/
clustering/src/org/apache/axis2/clustering/control/ clusteri...
Author: azeez
Date: Wed Jun 6 07:09:41 2007
New Revision: 544845
URL: http://svn.apache.org/viewvc?view=rev&rev=544845
Log:
Few more improvements to the clustering implementation.
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/CommitCommand.java Wed Jun 6 07:09:41 2007
@@ -41,4 +41,8 @@
public void rollback(ConfigurationContext configContext) throws Exception {
// Nothing to implement
}
+
+ public String toString() {
+ return "CommitCommand";
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/ExceptionCommand.java Wed Jun 6 07:09:41 2007
@@ -51,4 +51,8 @@
public void rollback(ConfigurationContext configContext) throws Exception {
// Nothing to implement
}
+
+ public String toString() {
+ return "ExceptionCommand";
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/PrepareCommand.java Wed Jun 6 07:09:41 2007
@@ -41,4 +41,8 @@
public void rollback(ConfigurationContext configContext) throws Exception {
// Nothing to implement
}
+
+ public String toString() {
+ return "PrepareCommand";
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/configuration/commands/RollbackCommand.java Wed Jun 6 07:09:41 2007
@@ -41,4 +41,8 @@
public void rollback(ConfigurationContext configContext) throws Exception {
// Nothing to implement
}
+
+ public String toString() {
+ return "RollbackCommand";
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java Wed Jun 6 07:09:41 2007
@@ -40,4 +40,8 @@
}
}
}
+
+ public String toString() {
+ return "ContextClusteringCommandCollection(" + uniqueId + ")";
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java Wed Jun 6 07:09:41 2007
@@ -16,36 +16,18 @@
package org.apache.axis2.clustering.context.commands;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.ContextClusteringCommand;
-import org.apache.axis2.clustering.context.PropertyUpdater;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.PropertyDifference;
-
-import java.util.HashMap;
/**
*
*/
public class UpdateConfigurationContextCommand extends UpdateContextCommand {
- private PropertyUpdater propertyUpdater = new PropertyUpdater();
-
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
propertyUpdater.updateProperties(configurationContext);
}
- public boolean isPropertiesEmpty(){
- if (propertyUpdater.getProperties() == null) {
- propertyUpdater.setProperties(new HashMap());
- return true;
- }
- return propertyUpdater.getProperties().isEmpty();
- }
-
- public void addProperty(PropertyDifference diff) {
- if (propertyUpdater.getProperties() == null) {
- propertyUpdater.setProperties(new HashMap());
- }
- propertyUpdater.addContextProperty(diff);
+ public String toString() {
+ return "UpdateConfigurationContextCommand(" + uniqueId + ")";
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java Wed Jun 6 07:09:41 2007
@@ -16,14 +16,30 @@
package org.apache.axis2.clustering.context.commands;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
+import org.apache.axis2.clustering.context.PropertyUpdater;
import org.apache.axis2.context.PropertyDifference;
+import java.util.HashMap;
+
/**
*
*/
public abstract class UpdateContextCommand extends ContextClusteringCommand {
- public abstract void addProperty(PropertyDifference diff);
-
- public abstract boolean isPropertiesEmpty();
+ protected PropertyUpdater propertyUpdater = new PropertyUpdater();
+
+ public boolean isPropertiesEmpty() {
+ if (propertyUpdater.getProperties() == null) {
+ propertyUpdater.setProperties(new HashMap());
+ return true;
+ }
+ return propertyUpdater.getProperties().isEmpty();
+ }
+
+ public void addProperty(PropertyDifference diff) {
+ if (propertyUpdater.getProperties() == null) {
+ propertyUpdater.setProperties(new HashMap());
+ }
+ propertyUpdater.addContextProperty(diff);
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java Wed Jun 6 07:09:41 2007
@@ -18,17 +18,13 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.PropertyUpdater;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.PropertyDifference;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.context.ServiceGroupContext;
import org.apache.axis2.description.AxisService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.HashMap;
-
/**
*
*/
@@ -36,7 +32,6 @@
private static final Log log = LogFactory.getLog(UpdateServiceContextCommand.class);
- private PropertyUpdater propertyUpdater = new PropertyUpdater();
protected String serviceGroupName;
protected String serviceGroupContextId;
protected String serviceName;
@@ -97,18 +92,7 @@
}
}
- public boolean isPropertiesEmpty() {
- if (propertyUpdater.getProperties() == null) {
- propertyUpdater.setProperties(new HashMap());
- return true;
- }
- return propertyUpdater.getProperties().isEmpty();
- }
-
- public void addProperty(PropertyDifference diff) {
- if (propertyUpdater.getProperties() == null) {
- propertyUpdater.setProperties(new HashMap());
- }
- propertyUpdater.addContextProperty(diff);
+ public String toString() {
+ return "UpdateServiceContextCommand(" + uniqueId + ")";
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Wed Jun 6 07:09:41 2007
@@ -16,23 +16,18 @@
package org.apache.axis2.clustering.context.commands;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.context.PropertyUpdater;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.PropertyDifference;
import org.apache.axis2.context.ServiceGroupContext;
import org.apache.axis2.description.AxisServiceGroup;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.HashMap;
-
/**
*
*/
public class UpdateServiceGroupContextCommand extends UpdateContextCommand {
private static Log log = LogFactory.getLog(UpdateServiceGroupContextCommand.class);
- private PropertyUpdater propertyUpdater = new PropertyUpdater();
protected String serviceGroupName;
protected String serviceGroupContextId;
@@ -70,18 +65,7 @@
propertyUpdater.updateProperties(sgCtx);
}
- public boolean isPropertiesEmpty() {
- if (propertyUpdater.getProperties() == null) {
- propertyUpdater.setProperties(new HashMap());
- return true;
- }
- return propertyUpdater.getProperties().isEmpty();
- }
-
- public void addProperty(PropertyDifference diff) {
- if (propertyUpdater.getProperties() == null) {
- propertyUpdater.setProperties(new HashMap());
- }
- propertyUpdater.addContextProperty(diff);
+ public String toString() {
+ return "UpdateServiceGroupContextCommand(" + uniqueId + ")";
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java Wed Jun 6 07:09:41 2007
@@ -88,4 +88,8 @@
public ContextClusteringCommand[] getCommands() {
return commands;
}
+
+ public String toString() {
+ return "GetStateCommand";
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Wed Jun 6 07:09:41 2007
@@ -45,4 +45,8 @@
public void setCommands(ContextClusteringCommand[] commands) {
this.commands = commands;
}
+
+ public String toString() {
+ return "GetStateResponseCommand";
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/handlers/ReplicationHandler.java Wed Jun 6 07:09:41 2007
@@ -21,6 +21,7 @@
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.context.ContextManager;
import org.apache.axis2.context.*;
+import org.apache.axis2.description.WSDL2Constants;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.commons.logging.Log;
@@ -34,29 +35,51 @@
private static final Log log = LogFactory.getLog(ReplicationHandler.class);
public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
- log.debug("Going to replicate state on invoke");
+// System.err.println("### [INVOKE] Going to replicate state. Flow:" + msgContext.getFLOW());
+ /* log.debug("Going to replicate state on invoke");
try {
replicateState(msgContext);
} catch (Exception e) {
System.err.println("###########################");
- e.printStackTrace();
+ String message = "Could not replicate the state";
+ log.error(message, e);
System.err.println("###########################");
- }
+ }*/
return InvocationResponse.CONTINUE;
}
public void flowComplete(MessageContext msgContext) {
- log.debug("Going to replicate state on flowComplete");
- try {
- replicateState(msgContext);
- } catch (Exception e) {
- String message = "Could not replicate the state";
- log.error(message, e);
+ int flow = msgContext.getFLOW();
+ String mep = msgContext.getAxisOperation().getMessageExchangePattern();
+
+ // The ReplicationHandler should be added to all 4 flows. We will replicate on flowComplete
+ // only during one of the flows
+ boolean replicateOnInFLow =
+ ((mep.equals(WSDL2Constants.MEP_URI_IN_ONLY) ||
+ mep.equals(WSDL2Constants.MEP_URI_IN_OPTIONAL_OUT) ||
+ mep.equals(WSDL2Constants.MEP_URI_ROBUST_IN_ONLY))
+ && (flow == MessageContext.IN_FLOW || flow == MessageContext.IN_FAULT_FLOW));
+
+ boolean replicateOnOutFlow =
+ (mep.equals(WSDL2Constants.MEP_URI_IN_OUT) ||
+ mep.equals(WSDL2Constants.MEP_URI_OUT_ONLY) ||
+ mep.equals(WSDL2Constants.MEP_URI_OUT_OPTIONAL_IN) ||
+ mep.equals(WSDL2Constants.MEP_URI_OUT_IN) ||
+ mep.equals(WSDL2Constants.MEP_URI_ROBUST_OUT_ONLY))
+ && (flow == MessageContext.OUT_FLOW || flow == MessageContext.OUT_FAULT_FLOW);
+
+ if (replicateOnInFLow || replicateOnOutFlow) {
+ System.err.println("### [FLOW COMPLETE] Going to replicate state. Flow:" + flow);
+ try {
+ replicateState(msgContext);
+ } catch (Exception e) {
+ String message = "Could not replicate the state";
+ log.error(message, e);
+ }
}
}
private void replicateState(MessageContext message) throws ClusteringFault {
-
ConfigurationContext configurationContext = message.getConfigurationContext();
AxisConfiguration axisConfiguration = configurationContext.getAxisConfiguration();
ClusterManager clusterManager = axisConfiguration.getClusterManager();
@@ -91,26 +114,26 @@
// Do the actual replication here
if (!contexts.isEmpty()) {
- String msgUUID = contextManager.
- updateContexts((AbstractContext[]) contexts.
+ String msgUUID =
+ contextManager.updateContexts((AbstractContext[]) contexts.
toArray(new AbstractContext[contexts.size()]));
long start = System.currentTimeMillis();
// Wait till all members have ACKed receipt & successful processing of
// the message with UUID 'msgUUID'
- while (!contextManager.isMessageAcknowledged(msgUUID)) {
- if(System.currentTimeMillis() - start > 20000){
- throw new ClusteringFault("ACKs not received from all members within 20 sec. " +
- "Aborting wait.");
- }
+ /*do {
try {
- Thread.sleep(20);
+ Thread.sleep(50);
} catch (InterruptedException e) {
log.error(e);
break;
}
- }
+ if (System.currentTimeMillis() - start > 20000) {
+ throw new ClusteringFault("ACKs not received from all members within 20 sec. " +
+ "Aborting wait.");
+ }
+ } while (!contextManager.isMessageAcknowledged(msgUUID));*/
}
} else {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Wed Jun 6 07:09:41 2007
@@ -59,10 +59,15 @@
for (int i = 0; i < members.length; i++) {
Member member = members[i];
if (!memberList.contains(member.getName())) {
-
+ System.err.println("\n\n");
+ System.err.println("##### NO ACK from member " + member.getName());
+ System.err.println("#### ACKed member list=" + memberList);
+ System.err.println("\n\n");
// At this point, resend the original message back to the node which has not
// sent an ACK
sender.sendToMember(ack.getCommand(), member);
+
+ //TODO: Check whether this is a new member. If then send the msg
isAcknowledged = false;
break;
} else {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Wed Jun 6 07:09:41 2007
@@ -17,6 +17,7 @@
package org.apache.axis2.clustering.tribes;
import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.configuration.ConfigurationClusteringCommand;
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
@@ -94,7 +95,20 @@
&& !(msg instanceof GetStateResponseCommand)) {
return;
}
- log.debug("RECEIVED MESSAGE " + msg);
+ log.debug("RECEIVED MESSAGE " + msg + " from " + sender.getName());
+
+ // Need to process ACKs as soon as they are received since otherwise,
+ // unnecessary retransmissions will take place
+ if(msg instanceof AckCommand){
+ try {
+ controlCommandProcessor.process((AckCommand) msg, sender);
+ } catch (Exception e) {
+ log.error(e);
+ }
+ return;
+ }
+
+ // Add the commands to be precessed to the cmdQueue
synchronized (cmdQueue) {
cmdQueue.enqueue(new MemberMessage(msg, sender));
}
@@ -106,8 +120,8 @@
private void startMessageProcessor() {
messageProcessor = new Thread(new MessageProcessor(), "ClusteringInComingMessageProcessor");
messageProcessor.setDaemon(true);
- messageProcessor.start();
messageProcessor.setPriority(Thread.MAX_PRIORITY);
+ messageProcessor.start();
}
/**
@@ -132,7 +146,7 @@
}
/**
- * A processor which continously polls for messages in the cmdQueue and processes them
+ * A processor which continuously polls for messages in the cmdQueue and processes them
*/
private class MessageProcessor implements Runnable {
public void run() {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Wed Jun 6 07:09:41 2007
@@ -22,9 +22,12 @@
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class ChannelSender implements MessageSender {
+ private Log log = LogFactory.getLog(ChannelSender.class);
private Channel channel;
public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
@@ -32,7 +35,8 @@
Member[] members = channel.getMembers();
if (members.length > 0) {
try {
- channel.send(members, msg, 0);
+ channel.send(members, msg, Channel.DEFAULT);
+ log.debug("Sent " + msg + " to group");
} catch (ChannelException e) {
String message = "Error sending command message : " + msg;
throw new ClusteringFault(message, e);
@@ -45,7 +49,8 @@
try {
channel.send(new Member[]{channel.getLocalMember(true)},
msg,
- Channel.SEND_OPTIONS_USE_ACK);
+ Channel.DEFAULT);
+ log.debug("Sent " + msg + " to self");
} catch (ChannelException e) {
throw new ClusteringFault(e);
}
@@ -57,6 +62,7 @@
if (group.length > 0) {
try {
channel.send(group, throwable, 0);
+ log.debug("Sent " + throwable + " to group");
} catch (ChannelException e) {
String message = "Error sending exception message : " + throwable;
throw new ClusteringFault(message, e);
@@ -66,7 +72,8 @@
public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
try {
- channel.send(new Member[]{member}, cmd, Channel.SEND_OPTIONS_USE_ACK);
+ channel.send(new Member[]{member}, cmd, Channel.DEFAULT);
+ log.debug("Sent " + cmd + " to " + member.getName());
} catch (ChannelException e) {
throw new ClusteringFault(e);
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed Jun 6 07:09:41 2007
@@ -37,10 +37,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Random;
+import java.util.*;
public class TribesClusterManager implements ClusterManager {
private static final Log log = LogFactory.getLog(TribesClusterManager.class);
@@ -94,9 +91,6 @@
domain = "apache.axis2.domain".getBytes();
}
channel.getMembershipService().setDomain(domain);
-
-// ((McastService)channel.getMembershipService()).setPort(5000);
-
DomainFilterInterceptor dfi = new DomainFilterInterceptor();
dfi.setDomain(domain);
channel.addInterceptor(dfi);
@@ -112,6 +106,16 @@
nbc.setPrevious(dfi);
channel.addInterceptor(nbc);*/
+ /*Properties mcastProps = channel.getMembershipService().getProperties();
+ mcastProps.setProperty("mcastPort", "5555");
+ mcastProps.setProperty("mcastAddress", "224.10.10.10");
+ mcastProps.setProperty("mcastClusterDomain", "catalina");
+ mcastProps.setProperty("bindAddress", "localhost");
+ mcastProps.setProperty("memberDropTime", "20000");
+ mcastProps.setProperty("mcastFrequency", "500");
+ mcastProps.setProperty("tcpListenPort", "4000");
+ mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
+
// TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
// tcpFailureDetector.setPrevious(nbc);
// channel.addInterceptor(tcpFailureDetector);
@@ -127,11 +131,16 @@
channelListener.setContextManager(contextManager);
Member[] members = channel.getMembers();
+ log.info("Local Tribes Member " + channel.getLocalMember(true).getName());
TribesUtil.printMembers(members);
// If there is at least one member in the Tribe, get the current state from a member
Random random = new Random();
int numberOfTries = 0; // Don't keep on trying infinitely
+
+ // Keep track of members to whom we already sent a GetStateCommand
+ // Do not send another request to these members
+ List sentMembersList = new ArrayList();
while (members.length > 0 &&
configurationContext.
getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
@@ -141,9 +150,13 @@
try {
members = channel.getMembers();
int memberIndex = random.nextInt(members.length);
- sender.sendToMember(new GetStateCommand(), members[memberIndex]);
- log.debug("WAITING FOR STATE UPDATE...");
- Thread.sleep(200);
+ Member member = members[memberIndex];
+ if (!sentMembersList.contains(member.getName())) {
+ sender.sendToMember(new GetStateCommand(), member);
+ sentMembersList.add(member.getName());
+ log.debug("WAITING FOR STATE UPDATE...");
+ Thread.sleep(1000);
+ }
} catch (Exception e) {
e.printStackTrace();
break;
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java Wed Jun 6 07:09:41 2007
@@ -22,7 +22,7 @@
public abstract class ContextClusteringCommand extends ClusteringCommand {
- private String uniqueId;
+ protected String uniqueId;
public abstract void execute(ConfigurationContext configContext) throws ClusteringFault;
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java?view=diff&rev=544845&r1=544844&r2=544845
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/description/AxisOperation.java Wed Jun 6 07:09:41 2007
@@ -418,7 +418,7 @@
* Further, in the first lookup, it will cache the looked
* up value so that the subsequent method calls are extremely efficient.
*/
- public int getAxisSpecifMEPConstant() {
+ public int getAxisSpecificMEPConstant() {
if (this.mep != WSDLConstants.MEP_CONSTANT_INVALID) {
return this.mep;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org