You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/12 07:24:56 UTC
svn commit: r611371 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/tribes/
kernel/src/org/apache/axis2/clustering/
Author: azeez
Date: Fri Jan 11 22:24:54 2008
New Revision: 611371
URL: http://svn.apache.org/viewvc?rev=611371&view=rev
Log:
1. Removing unncessary method from ClusterManager
2. Handling SYNC_CK & ASYNC type message sending
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=611371&r1=611370&r2=611371&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri Jan 11 22:24:54 2008
@@ -38,6 +38,12 @@
private Log log = LogFactory.getLog(ChannelSender.class);
private Channel channel;
+ private boolean synchronizeAllMembers;
+
+ public ChannelSender(Channel channel, boolean synchronizeAllMembers) {
+ this.channel = channel;
+ this.synchronizeAllMembers = synchronizeAllMembers;
+ }
public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
if (channel == null) {
@@ -49,8 +55,12 @@
// causing a view change. All nodes in a view should get the msg
if (members.length > 0) {
try {
- channel.send(members, toByteMessage(msg),
- Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ if (synchronizeAllMembers) {
+ channel.send(members, toByteMessage(msg),
+ Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ } else {
+ channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ }
if (log.isDebugEnabled()) {
log.debug("Sent " + msg + " to group");
}
@@ -61,13 +71,13 @@
throw new ClusteringFault(message, e);
} catch (ChannelException e) {
log.error("Could not send message to some members", e);
-// ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
-// for (int i = 0; i < faultyMembers.length; i++) {
-// ChannelException.FaultyMember faultyMember = faultyMembers[i];
-// Member member = faultyMember.getMember();
-// log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
-// faultyMember.getCause(), faultyMember.getCause());
-// }
+ ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+ for (int i = 0; i < faultyMembers.length; i++) {
+ ChannelException.FaultyMember faultyMember = faultyMembers[i];
+ Member member = faultyMember.getMember();
+ log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+ faultyMember.getCause());
+ }
} catch (Exception e) {
String message = "Error sending command message : " + msg +
". Reason " + e.getMessage();
@@ -117,24 +127,13 @@
throw new ClusteringFault(message, e);
} catch (ChannelException e) {
log.error("Could not send message to " + TribesUtil.getHost(member));
-// ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
-// log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
-// faultyMembers[0].getCause(), faultyMembers[0].getCause());
-// if (!(e.getCause() instanceof java.net.ConnectException)) { // If it is not a connection exception, we try to resend the message
-// sendToMember(cmd, member);
-// }
+ ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+ log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+ faultyMembers[0].getCause());
} catch (Exception e) {
String message = "Could not send message to " + TribesUtil.getHost(member) +
". Reason " + e.getMessage();
log.warn(message, e);
}
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public void setChannel(Channel channel) {
- this.channel = channel;
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611371&r1=611370&r2=611371&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri Jan 11 22:24:54 2008
@@ -123,14 +123,12 @@
}
}
- channelSender = new ChannelSender();
- channelListener = new ChannelListener(configurationContext,
- configurationManager,
- contextManager,
- controlCmdProcessor);
+ channel = new GroupChannel();
+ channelSender = new ChannelSender(channel, synchronizeAllMembers());
+ channelListener = new ChannelListener(configurationContext, configurationManager,
+ contextManager, controlCmdProcessor);
controlCmdProcessor.setChannelSender(channelSender);
- channel = new GroupChannel();
// Set the maximum number of retries, if message sending to a particular node fails
Parameter maxRetriesParam = getParameter("maxRetries");
@@ -186,7 +184,6 @@
// OrderInterceptor orderInterceptor = new OrderInterceptor();
-
// Add a AtMostOnceInterceptor to support at-most-once message processing semantics
AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor(channel);
channel.addInterceptor(atMostOnceInterceptor);
@@ -215,7 +212,6 @@
} catch (ChannelException e) {
throw new ClusteringFault("Error starting Tribes channel", e);
}
- channelSender.setChannel(channel);
log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers();
@@ -351,6 +347,14 @@
}
}
+ /**
+ * Method to check whether all members in the cluster have to be kep in sync at all times.
+ * Typically, this will require each member in the cluster to ACKnowledge receipt of a
+ * particular message, which may have a significant performance hit.
+ *
+ * @return true - if all members in the cluster should be kept in sync at all times, false
+ * otherwise
+ */
public boolean synchronizeAllMembers() {
Parameter syncAllParam = getParameter(ClusteringConstants.SYNCHRONIZE_ALL_MEMBERS);
return syncAllParam == null || Boolean.parseBoolean((String) syncAllParam.getValue());
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=611371&r1=611370&r2=611371&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Fri Jan 11 22:24:54 2008
@@ -70,14 +70,4 @@
*/
void setConfigurationContext(ConfigurationContext configurationContext);
- /**
- * Method to check whether all members in the cluster have to be kep in sync at all times.
- * Typically, this will require each member in the cluster to ACKnowledge receipt of a
- * particular message, which may have a significant performance hit.
- *
- * @return true - if all members in the cluster should be kept in sync at all times,
- * false otherwise
- */
- boolean synchronizeAllMembers();
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org