You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/11 18:17:24 UTC
svn commit: r611239 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/context/
clustering/src/org/apache/axis2/clustering/context/commands/
clustering/src/org/apache/axis2/clustering/control/
clustering/src/org/apac...
Author: azeez
Date: Fri Jan 11 09:17:19 2008
New Revision: 611239
URL: http://svn.apache.org/viewvc?rev=611239&view=rev
Log:
1. Moving the ACK mechanism down to the Tribes layer - this leads to reduction in the number of messages we have to send across
2. Introducing an AtMostOnceInterceptor so that duplicate messages can be handled at the TRibes layer itself - no need to deserialize the message to find out whether it's a duplicate. Can use the
uuid appended by Tribes
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
Removed:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/AckCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Fri Jan 11 09:17:19 2008
@@ -26,7 +26,6 @@
import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
import org.apache.axis2.clustering.context.commands.UpdateServiceContextCommand;
import org.apache.axis2.clustering.context.commands.UpdateServiceGroupContextCommand;
-import org.apache.axis2.clustering.tribes.AckManager;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.PropertyDifference;
@@ -64,8 +63,6 @@
commands.add(cmd);
}
}
- collection.setUniqueId(UUIDGenerator.getUUID());
- AckManager.addInitialAcknowledgement(collection);
return collection;
}
@@ -82,15 +79,12 @@
UpdateContextCommand cmd = toUpdateContextCommand(context);
if (cmd != null) {
- cmd.setUniqueId(UUIDGenerator.getUUID());
fillProperties(cmd,
context,
excludedPropertyPatterns,
includeAllProperties);
if (cmd.isPropertiesEmpty()) {
cmd = null;
- } else {
- AckManager.addInitialAcknowledgement(cmd);
}
}
@@ -107,13 +101,10 @@
UpdateContextCommand cmd = toUpdateContextCommand(context);
if (cmd != null) {
- cmd.setUniqueId(UUIDGenerator.getUUID());
fillProperties(cmd, context, propertyNames);
if (cmd.isPropertiesEmpty()) {
cmd = null;
- } else {
- AckManager.addInitialAcknowledgement(cmd);
- }
+ }
}
synchronized (context) {
@@ -272,7 +263,6 @@
if (abstractContext instanceof ServiceGroupContext) {
ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
DeleteServiceGroupContextCommand cmd = new DeleteServiceGroupContextCommand();
- cmd.setUniqueId(UUIDGenerator.getUUID());
cmd.setServiceGroupContextId(sgCtx.getId());
return cmd;
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Fri Jan 11 09:17:19 2008
@@ -22,9 +22,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.tribes.AckManager;
import org.apache.axis2.clustering.tribes.ChannelSender;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
@@ -32,7 +30,11 @@
import org.apache.axis2.context.ServiceGroupContext;
import org.apache.axis2.description.Parameter;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
public class DefaultContextManager implements ContextManager {
@@ -42,7 +44,6 @@
private Map parameters = new HashMap();
private ChannelSender sender;
- private ContextReplicationProcessor processor = new ContextReplicationProcessor();
private Map excludedReplicationPatterns = new HashMap();
@@ -54,41 +55,35 @@
public DefaultContextManager() {
}
- public String updateContext(AbstractContext context) throws ClusteringFault {
+ public void updateContext(AbstractContext context) throws ClusteringFault {
ContextClusteringCommand cmd =
ContextClusteringCommandFactory.getUpdateCommand(context,
excludedReplicationPatterns,
false);
if (cmd != null) {
- processor.process(cmd);
- return cmd.getUniqueId();
+ sender.sendToGroup(cmd);
}
- return null;
}
- public String updateContext(AbstractContext context,
+ public void updateContext(AbstractContext context,
String[] propertyNames) throws ClusteringFault {
ContextClusteringCommand cmd =
ContextClusteringCommandFactory.getUpdateCommand(context, propertyNames);
if (cmd != null) {
- processor.process(cmd);
- return cmd.getUniqueId();
+ sender.sendToGroup(cmd);
}
- return null;
}
- public String updateContexts(AbstractContext[] contexts) throws ClusteringFault {
+ public void updateContexts(AbstractContext[] contexts) throws ClusteringFault {
ContextClusteringCommandCollection cmd =
ContextClusteringCommandFactory.getCommandCollection(contexts,
excludedReplicationPatterns);
- processor.process(cmd);
- return cmd.getUniqueId();
+ sender.sendToGroup(cmd);
}
- public String removeContext(AbstractContext context) throws ClusteringFault {
+ public void removeContext(AbstractContext context) throws ClusteringFault {
ContextClusteringCommand cmd = ContextClusteringCommandFactory.getRemoveCommand(context);
- processor.process(cmd);
- return cmd.getUniqueId();
+ sender.sendToGroup(cmd);
}
public boolean isContextClusterable(AbstractContext context) {
@@ -97,14 +92,6 @@
(context instanceof ServiceGroupContext);
}
- public boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault {
- return AckManager.isMessageAcknowledged(messageUniqueId, sender);
- }
-
- public void process(ContextClusteringCommand command) throws ClusteringFault {
- command.execute(configContext);
- }
-
public void setContextManagerListener(ContextManagerListener listener) {
this.listener = listener;
if (configContext != null) {
@@ -114,7 +101,7 @@
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configContext = configurationContext;
- if(listener != null){
+ if (listener != null) {
listener.setConfigurationContext(configContext);
}
}
@@ -156,43 +143,4 @@
throw new UnsupportedOperationException();
}
// ---------------------------------------------------------------------------------------------
-
- private class ContextReplicationProcessor {
- public void process(final ContextClusteringCommand cmd) throws ClusteringFault {
-
- // If the sender is NULL, it means the TribesClusterManager is still being initialized
- // So we need to busy wait.
- if (sender == null) {
- Thread processorThread = new Thread("ProcessorThread") {
- public void run() {
- do {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } while (sender == null);
- try {
- long tts = sender.sendToGroup(cmd);
- configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
- new Long(tts));
- } catch (ClusteringFault clusteringFault) {
- AckManager.removeMessage(cmd.getUniqueId());
- throw new RuntimeException(clusteringFault);
- }
- }
- };
- processorThread.start();
- } else {
- try {
- long tts = sender.sendToGroup(cmd);
- configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
- new Long(tts));
- } catch (ClusteringFault clusteringFault) {
- AckManager.removeMessage(cmd.getUniqueId());
- throw clusteringFault;
- }
- }
- }
- }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java Fri Jan 11 09:17:19 2008
@@ -45,6 +45,6 @@
}
public String toString() {
- return "ContextClusteringCommandCollection(" + uniqueId + ")";
+ return "ContextClusteringCommandCollection";
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java Fri Jan 11 09:17:19 2008
@@ -31,6 +31,6 @@
}
public String toString() {
- return "UpdateConfigurationContextCommand(" + uniqueId + ")";
+ return "UpdateConfigurationContextCommand";
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java Fri Jan 11 09:17:19 2008
@@ -102,6 +102,6 @@
}
public String toString() {
- return "UpdateServiceContextCommand(" + uniqueId + ")";
+ return "UpdateServiceContextCommand";
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Fri Jan 11 09:17:19 2008
@@ -73,6 +73,6 @@
}
public String toString() {
- return "UpdateServiceGroupContextCommand(" + uniqueId + ")";
+ return "UpdateServiceGroupContextCommand";
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java Fri Jan 11 09:17:19 2008
@@ -44,8 +44,10 @@
// Run this code only if this node is not already initialized
if (configContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ getPropertyNonReplicable(ClusteringConstants.RECD_CONFIG_INIT_MSG) == null) {
log.info("Received configuration initialization message");
+ configContext.
+ setNonReplicableProperty(ClusteringConstants.RECD_CONFIG_INIT_MSG, "true");
if (serviceGroups != null) {
// Load all the service groups that are sent by the neighbour
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Fri Jan 11 09:17:19 2008
@@ -34,17 +34,17 @@
private ContextClusteringCommand[] commands;
- public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ public void execute(ConfigurationContext configContext) throws ClusteringFault {
// Run this code only if this node is not already initialized
- if (configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ if (configContext.
+ getPropertyNonReplicable(ClusteringConstants.RECD_STATE_INIT_MSG) == null) {
+ configContext.
+ setNonReplicableProperty(ClusteringConstants.RECD_STATE_INIT_MSG, "true");
log.info("Received state initialization message");
- configurationContext.
- setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
if (commands != null) {
for (int i = 0; i < commands.length; i++) {
- commands[i].execute(configurationContext);
+ commands[i].execute(configContext);
}
}
}
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=611239&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Fri Jan 11 09:17:19 2008
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ByteMessage;
+import org.apache.catalina.tribes.util.UUIDGenerator;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.group.InterceptorPayload;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.AxisModule;
+
+import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.Timer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.net.Socket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.ConnectException;
+import java.io.Serializable;
+
+/**
+ * Message intereceptor for handling at-most-once message processing semantics
+ */
+public class AtMostOnceInterceptor extends ChannelInterceptorBase {
+
+ private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class);
+ private static final Map receivedMessages = new HashMap();
+
+ /**
+ * The time a message lives in the receivedMessages Map
+ */
+ private static final int TIMEOUT = 5 * 60 * 1000;
+
+ private Channel channel;
+
+
+ public AtMostOnceInterceptor(Channel channel) {
+ this();
+ this.channel = channel;
+ }
+
+ public AtMostOnceInterceptor() {
+
+ TimerTask cleanupTask = new TimerTask() {
+ public void run() {
+ List toBeRemoved = new ArrayList();
+ for (Iterator iterator = receivedMessages.keySet().iterator();
+ iterator.hasNext();) {
+ ChannelMessage msg = (ChannelMessage) iterator.next();
+ long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
+ if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
+ toBeRemoved.add(msg);
+ }
+ }
+ for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
+ ChannelMessage msg = (ChannelMessage) iterator.next();
+ receivedMessages.remove(msg);
+ }
+ }
+ };
+ new Timer().scheduleAtFixedRate(cleanupTask, TIMEOUT, TIMEOUT);
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ super.messageReceived(msg);
+ if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
+ /*XByteBuffer message1 = msg.getMessage();
+
+
+ try {
+ List classLoaders = new ArrayList();
+ classLoaders.add(AtMostOnceInterceptor.class.getClassLoader());
+ Serializable msg2 = XByteBuffer.deserialize(message1.getBytes(),
+ 0,
+ message1.getBytes().length,
+ (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+ log.debug("###### added new msg " + TribesUtil.getLocalHost(channel) + " msg2=" + msg2);
+ } catch (Exception e) {
+ log.error("Cannot deserialize received message", e);
+ return;
+ }*/
+
+
+ receivedMessages.put(msg, new Long(System.currentTimeMillis()));
+ super.messageReceived(msg);
+ } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
+ log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
+ }
+ }
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Fri Jan 11 09:17:19 2008
@@ -25,9 +25,6 @@
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
import org.apache.axis2.clustering.context.DefaultContextManager;
-import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
-import org.apache.axis2.clustering.control.AckCommand;
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
import org.apache.axis2.clustering.control.GetStateResponseCommand;
@@ -37,54 +34,33 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
private static final Log log = LogFactory.getLog(ChannelListener.class);
- /**
- * The time a message lives in the receivedMessages Map
- */
- private static final int TIME_TO_LIVE = 5 * 60 * 1000; // 5 mins
-
private DefaultContextManager contextManager;
private DefaultConfigurationManager configurationManager;
private TribesControlCommandProcessor controlCommandProcessor;
- private ChannelSender channelSender;
private ConfigurationContext configurationContext;
- private boolean synchronizeAllMembers;
-
- private Map receivedMessages = new HashMap();
public ChannelListener(ConfigurationContext configurationContext,
DefaultConfigurationManager configurationManager,
DefaultContextManager contextManager,
- TribesControlCommandProcessor controlCommandProcessor,
- ChannelSender sender,
- boolean synchronizeAllMembers) {
+ TribesControlCommandProcessor controlCommandProcessor) {
this.configurationManager = configurationManager;
this.contextManager = contextManager;
this.controlCommandProcessor = controlCommandProcessor;
- this.channelSender = sender;
this.configurationContext = configurationContext;
- this.synchronizeAllMembers = synchronizeAllMembers;
-
- Timer cleanupTimer = new Timer();
- cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
- TIME_TO_LIVE,
- TIME_TO_LIVE);
}
public void setContextManager(DefaultContextManager contextManager) {
@@ -106,8 +82,7 @@
* @return boolean
*/
public boolean accept(Serializable msg, Member sender) {
- return configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) != null;
+ return true;
}
/**
@@ -137,8 +112,9 @@
message.length,
(ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
} catch (Exception e) {
- log.error("Cannot deserialize received message", e);
- return;
+ String errMsg = "Cannot deserialize received message";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
}
// If the system has not still been intialized, reject all incoming messages, except the
@@ -160,70 +136,20 @@
try {
processMessage(msg, sender);
} catch (Exception e) {
- log.error("Cannot process message", e);
+ String errMsg = "Cannot process received message";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
}
}
private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
- //TODO: Handle ACK implosion?
-
if (msg instanceof ContextClusteringCommand && contextManager != null) {
ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
- String msgId = ctxCmd.getUniqueId();
-
- // Check for duplicate messages and ignore duplicates in order to support at-most-once semantics
- if (receivedMessages.containsKey(msgId)) {
- if (log.isDebugEnabled()) {
- log.debug("Received duplicate message " + ctxCmd);
- }
- return;
- }
- synchronized (receivedMessages) {
- receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
- }
-
- // Process the message
- contextManager.process(ctxCmd);
-
- // Sending ACKs for ContextClusteringCommandCollection or
- // UpdateContextCommand is sufficient
- if (synchronizeAllMembers) { // Send ACK only if the relevant cluster config parameter is set
- if (msg instanceof ContextClusteringCommandCollection ||
- msg instanceof UpdateContextCommand) {
- AckCommand ackCmd = new AckCommand(msgId);
-
- // Send the ACK
- this.channelSender.sendToMember(ackCmd, sender);
- }
- }
- } else if (msg instanceof ConfigurationClusteringCommand &&
- configurationManager != null) {
+ ctxCmd.execute(configurationContext);
+ } else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) {
configurationManager.process((ConfigurationClusteringCommand) msg);
} else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
controlCommandProcessor.process((ControlCommand) msg, sender);
- }
- }
-
- private class ReceivedMessageCleanupTask extends TimerTask {
-
- public void run() {
- List toBeRemoved = new ArrayList();
- synchronized (receivedMessages) {
- for (Iterator iter = receivedMessages.keySet().iterator(); iter.hasNext();) {
- String msgId = (String) iter.next();
- Long recdTime = (Long) receivedMessages.get(msgId);
- if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
- toBeRemoved.add(msgId);
- }
- }
- for (Iterator iter = toBeRemoved.iterator(); iter.hasNext();) {
- String msgId = (String) iter.next();
- receivedMessages.remove(msgId);
- if (log.isDebugEnabled()) {
- log.debug("Removed message " + msgId + " from received message buffer");
- }
- }
- }
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri Jan 11 09:17:19 2008
@@ -39,23 +39,21 @@
private Log log = LogFactory.getLog(ChannelSender.class);
private Channel channel;
- public long sendToGroup(ClusteringCommand msg) throws ClusteringFault {
+ public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
if (channel == null) {
- return 0;
+ return;
}
- long timeToSend = 0;
-
Member[] members = MembershipManager.getMembers();
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
// causing a view change. All nodes in a view should get the msg
if (members.length > 0) {
try {
- long start = System.currentTimeMillis();
- channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
-// channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
- timeToSend = System.currentTimeMillis() - start;
- log.debug("Sent " + msg + " to group");
+ channel.send(members, toByteMessage(msg),
+ Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ if (log.isDebugEnabled()) {
+ log.debug("Sent " + msg + " to group");
+ }
} catch (NotSerializableException e) {
String message = "Could not send command message " + msg +
" to group since it is not serializable.";
@@ -76,7 +74,6 @@
log.warn(message, e);
}
}
- return timeToSend;
}
private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
@@ -96,21 +93,22 @@
channel.send(new Member[]{channel.getLocalMember(true)},
toByteMessage(msg),
Channel.SEND_OPTIONS_USE_ACK);
- log.debug("Sent " + msg + " to self");
+ if (log.isDebugEnabled()) {
+ log.debug("Sent " + msg + " to self");
+ }
} catch (Exception e) {
throw new ClusteringFault(e);
}
}
- public long sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
- long timeToSend = 0;
+ public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
try {
if (member.isReady()) {
- long start = System.currentTimeMillis();
- channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
-// channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_USE_ACK);
- timeToSend = System.currentTimeMillis() - start;
- log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+ channel.send(new Member[]{member}, toByteMessage(cmd),
+ Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ if (log.isDebugEnabled()) {
+ log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+ }
}
} catch (NotSerializableException e) {
String message = "Could not send command message to " + TribesUtil.getHost(member) +
@@ -130,7 +128,6 @@
". Reason " + e.getMessage();
log.warn(message, e);
}
- return timeToSend;
}
public Channel getChannel() {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri Jan 11 09:17:19 2008
@@ -47,7 +47,6 @@
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.catalina.tribes.transport.ReplicationTransmitter;
import org.apache.commons.logging.Log;
@@ -128,9 +127,7 @@
channelListener = new ChannelListener(configurationContext,
configurationManager,
contextManager,
- controlCmdProcessor,
- channelSender,
- synchronizeAllMembers());
+ controlCmdProcessor);
controlCmdProcessor.setChannelSender(channelSender);
channel = new GroupChannel();
@@ -159,6 +156,8 @@
} else {
domain = "apache.axis2.domain".getBytes();
}
+
+ // Add a DomainFilterInterceptor
channel.getMembershipService().setDomain(domain);
DomainFilterInterceptor dfi = new DomainFilterInterceptor();
dfi.setDomain(domain);
@@ -186,10 +185,17 @@
mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
// OrderInterceptor orderInterceptor = new OrderInterceptor();
-
+
+ // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor(channel);
+ channel.addInterceptor(atMostOnceInterceptor);
+ atMostOnceInterceptor.setPrevious(dfi);
+
+ // Add a reliable failure detector
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
- tcpFailureDetector.setPrevious(dfi);
+// tcpFailureDetector.setPrevious(dfi);
+ tcpFailureDetector.setPrevious(atMostOnceInterceptor);
channel.addInterceptor(tcpFailureDetector);
channel.addChannelListener(channelListener);
@@ -211,7 +217,6 @@
}
channelSender.setChannel(channel);
-// Member[] members = channel.getMembers();
log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers();
@@ -250,6 +255,7 @@
List sentMembersList = new ArrayList();
sentMembersList.add(TribesUtil.getLocalHost(channel));
Member[] members = MembershipManager.getMembers();
+
while (members.length > 0 &&
configurationContext.
getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
@@ -261,13 +267,10 @@
MembershipManager.getLongestLivingMember() : // First try to get from the longest alive member
MembershipManager.getRandomMember(); // Else get from a random member
if (!sentMembersList.contains(TribesUtil.getHost(member))) {
- long tts = sender.sendToMember(command, member);
- configurationContext.
- setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
- new Long(tts));
+ sender.sendToMember(command, member);
sentMembersList.add(TribesUtil.getHost(member));
log.debug("WAITING FOR INITIALIZATION MESSAGE...");
- Thread.sleep(tts + 5 * (numberOfTries + 1));
+ Thread.sleep(10 * (numberOfTries + 1));
}
} catch (Exception e) {
log.error("Cannot get initialization information", e);
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesControlCommandProcessor.java Fri Jan 11 09:17:19 2008
@@ -20,12 +20,11 @@
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.control.AckCommand;
import org.apache.axis2.clustering.control.ControlCommand;
-import org.apache.axis2.clustering.control.GetStateCommand;
-import org.apache.axis2.clustering.control.GetStateResponseCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
+import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
@@ -77,10 +76,6 @@
getConfigRespCmd.
setServiceGroups(((GetConfigurationCommand) command).getServiceGroupNames());
channelSender.sendToMember(getConfigRespCmd, sender);
- } else if (command instanceof AckCommand) {
- AckCommand cmd = (AckCommand) command;
- cmd.setMemberId(TribesUtil.getHost(sender));
- cmd.execute(configurationContext);
} else {
command.execute(configurationContext);
}
Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ContextReplicationTest.java Fri Jan 11 09:17:19 2008
@@ -31,6 +31,7 @@
import org.apache.axis2.context.ServiceGroupContext;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.description.Parameter;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.transport.http.server.HttpUtils;
@@ -38,12 +39,16 @@
import java.util.List;
/**
- *
+ * Tests the replication of properties placed the ConfigurationContext, ServiceGroupContext &
+ * ServiceContext
*/
public class ContextReplicationTest extends TestCase {
private static final String TEST_SERVICE_NAME = "testService";
+ private static final Parameter domainParam =
+ new Parameter(ClusteringConstants.DOMAIN, "axis2.domain." + UUIDGenerator.getUUID());
+
// --------------- Cluster-1 ------------------------------------------------------
private ClusterManager clusterManager1;
private ContextManager ctxMan1;
@@ -73,6 +78,7 @@
ctxMan1 = getContextManager();
configMan1 = getConfigurationManager();
clusterManager1 = getClusterManager(configurationContext1, ctxMan1, configMan1);
+ clusterManager1.addParameter(domainParam);
clusterManager1.init();
System.out.println("ClusterManager-1 successfully initialized");
@@ -84,6 +90,7 @@
ctxMan2 = getContextManager();
configMan2 = getConfigurationManager();
clusterManager2 = getClusterManager(configurationContext2, ctxMan2, configMan2);
+ clusterManager2.addParameter(domainParam);
clusterManager2.init();
System.out.println("ClusterManager-2 successfully initialized");
}
@@ -139,8 +146,6 @@
String val1 = "configCtxVal1";
configurationContext1.setProperty(key1, val1);
ctxMan1.updateContext(configurationContext1);
- Thread.sleep(1000); // Give some time for the replication to take place
-
String value = (String) configurationContext2.getProperty(key1);
assertEquals(val1, value);
}
@@ -150,8 +155,6 @@
String val2 = "configCtxVal1";
configurationContext2.setProperty(key2, val2);
ctxMan2.updateContext(configurationContext2);
- Thread.sleep(1000); // Give some time for the replication to take place
-
String value = (String) configurationContext1.getProperty(key2);
assertEquals(val2, value);
}
@@ -165,8 +168,6 @@
{
configurationContext1.setProperty(key1, val1);
ctxMan1.updateContext(configurationContext1);
- Thread.sleep(1000); // Give some time for the replication to take place
-
String value = (String) configurationContext2.getProperty(key1);
assertEquals(val1, value);
}
@@ -174,8 +175,6 @@
// Next remove this property from cluster 2, replicate it, and check that it is unavailable in cluster 1
configurationContext2.removeProperty(key1);
ctxMan2.updateContext(configurationContext2);
- Thread.sleep(1000); // Give some time for the replication to take place
-
String value = (String) configurationContext1.getProperty(key1);
assertNull(configurationContext2.getProperty(key1));
assertNull(value);
@@ -199,8 +198,6 @@
String val1 = "sgCtxVal1";
serviceGroupContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceGroupContext1);
-
- Thread.sleep(1000);
assertEquals(val1, serviceGroupContext2.getProperty(key1));
}
@@ -223,15 +220,12 @@
String val1 = "sgCtxVal1";
serviceGroupContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceGroupContext1);
-
- Thread.sleep(1000);
assertEquals(val1, serviceGroupContext2.getProperty(key1));
// Remove the property
serviceGroupContext2.removeProperty(key1);
assertNull(serviceGroupContext2.getProperty(key1));
ctxMan1.updateContext(serviceGroupContext2);
- Thread.sleep(1000);
assertNull(serviceGroupContext1.getProperty(key1));
}
@@ -255,7 +249,6 @@
serviceGroupContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceGroupContext1);
- Thread.sleep(1000);
assertEquals(val1, serviceGroupContext2.getProperty(key1));
}
@@ -281,14 +274,12 @@
serviceGroupContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceGroupContext1);
- Thread.sleep(1000);
assertEquals(val1, serviceGroupContext2.getProperty(key1));
// Remove the property
serviceGroupContext2.removeProperty(key1);
assertNull(serviceGroupContext2.getProperty(key1));
ctxMan1.updateContext(serviceGroupContext2);
- Thread.sleep(1000);
assertNull(serviceGroupContext1.getProperty(key1));
}
@@ -315,7 +306,6 @@
serviceContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceContext1);
- Thread.sleep(1000);
assertEquals(val1, serviceContext2.getProperty(key1));
}
@@ -342,7 +332,6 @@
serviceContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceContext1);
- Thread.sleep(1000);
assertEquals(val1, serviceContext2.getProperty(key1));
}
@@ -370,14 +359,12 @@
serviceContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceContext1);
- Thread.sleep(1000);
assertEquals(val1, serviceContext2.getProperty(key1));
// Remove the property
serviceContext2.removeProperty(key1);
assertNull(serviceContext2.getProperty(key1));
ctxMan1.updateContext(serviceContext2);
- Thread.sleep(1000);
assertNull(serviceContext1.getProperty(key1));
}
@@ -405,14 +392,12 @@
serviceContext1.setProperty(key1, val1);
ctxMan1.updateContext(serviceContext1);
- Thread.sleep(1000);
assertEquals(val1, serviceContext2.getProperty(key1));
// Remove the property
serviceContext2.removeProperty(key1);
assertNull(serviceContext2.getProperty(key1));
ctxMan1.updateContext(serviceContext2);
- Thread.sleep(1000);
assertNull(serviceContext1.getProperty(key1));
}
@@ -424,7 +409,6 @@
exclusionPatterns.add("local_*");
ctxMan1.setReplicationExcludePatterns("defaults", exclusionPatterns);
ctxMan1.updateContext(configurationContext1);
- Thread.sleep(1000); // Give some time for the replication to take place
String value = (String) configurationContext2.getProperty(key1);
assertNull(value); // The property should not have gotten replicated
@@ -439,7 +423,6 @@
ctxMan1.setReplicationExcludePatterns("org.apache.axis2.context.ConfigurationContext",
exclusionPatterns);
ctxMan1.updateContext(configurationContext1);
- Thread.sleep(1000); // Give some time for the replication to take place
String value = (String) configurationContext2.getProperty(key1);
assertNull(value); // The property should not have gotten replicated
@@ -467,7 +450,6 @@
ctxMan1.setReplicationExcludePatterns("defaults",
exclusionPatterns2);
ctxMan1.updateContext(configurationContext1);
- Thread.sleep(1000); // Give some time for the replication to take place
String value1 = (String) configurationContext2.getProperty(key1);
assertNull(value1); // The property should not have gotten replicated
Modified: webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/test/org/apache/axis2/clustering/ObjectSerializationTest.java Fri Jan 11 09:17:19 2008
@@ -16,8 +16,6 @@
package org.apache.axis2.clustering;
import junit.framework.TestCase;
-import org.apache.axis2.clustering.control.AckCommand;
-import org.apache.axis2.clustering.TestDO;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -31,17 +29,6 @@
public class ObjectSerializationTest extends TestCase {
public void testSerialization() throws IOException, ClassNotFoundException {
- AckCommand ackCommand = new AckCommand("uuid");
- ackCommand.setMemberId("123456");
-
- AckCommand ackCommand2 = (AckCommand) copy(ackCommand);
-
- assertNotNull(ackCommand2);
- assertFalse(ackCommand.equals(ackCommand2));
- assertEquals(ackCommand.getUniqueId(), ackCommand2.getUniqueId());
- }
-
- public void testSerialization2() throws IOException, ClassNotFoundException {
TestDO testDO = new TestDO("name", "value");
TestDO testDO2 = (TestDO) copy(testDO);
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Fri Jan 11 09:17:19 2008
@@ -39,7 +39,8 @@
public static final String NODE_MANAGER_SERVICE = "Axis2NodeManager";
public static final String REQUEST_BLOCKING_HANDLER = "RequestBlockingHandler";
public static final String CLUSTER_INITIALIZED = "local_cluster.initialized";
- public static final String TIME_TO_SEND = "local_cluster.time.to.send";
+ public static final String RECD_CONFIG_INIT_MSG = "local_recd.config.init.method";
+ public static final String RECD_STATE_INIT_MSG = "local_recd.state.init.method";
public static final String BLOCK_ALL_REQUESTS = "local_wso2wsas.block.requests";
public static final String LOCAL_IP_ADDRESS = "axis2.local.ip.address";
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/MessageSender.java Fri Jan 11 09:17:19 2008
@@ -23,7 +23,7 @@
*/
public interface MessageSender {
- public long sendToGroup(ClusteringCommand msg) throws ClusteringFault;
+ public void sendToGroup(ClusteringCommand msg) throws ClusteringFault;
public void sendToSelf(ClusteringCommand msg) throws ClusteringFault;
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextClusteringCommand.java Fri Jan 11 09:17:19 2008
@@ -25,15 +25,6 @@
public abstract class ContextClusteringCommand extends ClusteringCommand {
- protected String uniqueId;
-
public abstract void execute(ConfigurationContext configContext) throws ClusteringFault;
- public String getUniqueId() {
- return uniqueId;
- }
-
- public void setUniqueId(String uniqueId) {
- this.uniqueId = uniqueId;
- }
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/ContextManager.java Fri Jan 11 09:17:19 2008
@@ -34,11 +34,10 @@
* This could be addition of new properties, modifications of existing properties or
* removal of properties.
*
- * @param context The AbstractContext containing the properties to be replicated
- * @return The UUID of the message that was sent to the group communications framework
+ * @param context The context to be replicated
* @throws ClusteringFault If replication fails
*/
- String updateContext(AbstractContext context) throws ClusteringFault;
+ void updateContext(AbstractContext context) throws ClusteringFault;
/**
* This method is called when one need to update/replicate only certains properties in the
@@ -46,10 +45,9 @@
*
* @param context The AbstractContext containing the properties to be replicated
* @param propertyNames The names of the specific properties that should be replicated
- * @return The UUID of the message that was sent to the group communications framework
* @throws ClusteringFault If replication fails
*/
- String updateContext(AbstractContext context, String[] propertyNames) throws ClusteringFault;
+ void updateContext(AbstractContext context, String[] propertyNames) throws ClusteringFault;
/**
* This method is called when properties in a collection of {@link AbstractContext}s are updated.
@@ -57,19 +55,17 @@
* removal of properties.
*
* @param contexts The AbstractContexts containing the properties to be replicated
- * @return The UUID of the message that was sent to the group communications framework
* @throws ClusteringFault If replication fails
*/
- String updateContexts(AbstractContext[] contexts) throws ClusteringFault;
+ void updateContexts(AbstractContext[] contexts) throws ClusteringFault;
/**
* This method is called when {@link AbstractContext} is removed from the system
*
* @param context The AbstractContext to be removed
- * @return The UUID of the message that was sent to the group communications framework
* @throws ClusteringFault If context removal fails
*/
- String removeContext(AbstractContext context) throws ClusteringFault;
+ void removeContext(AbstractContext context) throws ClusteringFault;
/**
* @param context AbstractContext
@@ -85,7 +81,7 @@
* false - otherwise
* @throws ClusteringFault If an error occurs while checking whether a message is ACKed
*/
- boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault;
+// boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault;
/**
* @param listener ContextManagerListener
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java?rev=611239&r1=611238&r2=611239&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/context/Replicator.java Fri Jan 11 09:17:19 2008
@@ -20,7 +20,6 @@
package org.apache.axis2.clustering.context;
import org.apache.axis2.clustering.ClusterManager;
-import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
@@ -79,10 +78,7 @@
if (!contexts.isEmpty()) {
AbstractContext[] contextArray =
(AbstractContext[]) contexts.toArray(new AbstractContext[contexts.size()]);
- String msgUUID = contextManager.updateContexts(contextArray);
- if (getClusterManager(msgContext).synchronizeAllMembers()) {
- waitForACKs(contextManager, msgUUID, msgContext.getRootContext());
- }
+ contextManager.updateContexts(contextArray);
}
}
@@ -99,10 +95,7 @@
log.debug("Going to replicate state in " + abstractContext + "...");
ContextManager contextManager = getContextManager(abstractContext);
if (!abstractContext.getPropertyDifferences().isEmpty()) {
- String msgUUID = contextManager.updateContext(abstractContext);
- if (getClusterManager(abstractContext).synchronizeAllMembers()) {
- waitForACKs(contextManager, msgUUID, abstractContext.getRootContext());
- }
+ contextManager.updateContext(abstractContext);
}
}
@@ -121,12 +114,7 @@
}
log.debug("Going to replicate selected properties in " + abstractContext + "...");
ContextManager contextManager = getContextManager(abstractContext);
- String msgUUID = contextManager.updateContext(abstractContext, propertyNames);
- if (msgUUID != null) {
- if (getClusterManager(abstractContext).synchronizeAllMembers()) {
- waitForACKs(contextManager, msgUUID, abstractContext.getRootContext());
- }
- }
+ contextManager.updateContext(abstractContext, propertyNames);
}
private static ClusterManager getClusterManager(AbstractContext abstractContext) {
@@ -168,33 +156,5 @@
ClusterManager clusterManager =
messageContext.getRootContext().getAxisConfiguration().getClusterManager();
return clusterManager != null && clusterManager.getContextManager() != null;
- }
-
- private static void waitForACKs(ContextManager contextManager,
- String msgUUID,
- ConfigurationContext configCtx) throws ClusteringFault {
-
- long start = System.currentTimeMillis();
-
- // Wait till all members have ACKed receipt & successful processing of
- // the message with UUID 'msgUUID'
- do {
-
- // Wait sometime before checking whether message is ACKed
- try {
- Long tts =
- (Long) configCtx.getPropertyNonReplicable(ClusteringConstants.TIME_TO_SEND);
- if (tts == null) {
- Thread.sleep(5);
- } else if (tts.longValue() >= 0) {
- Thread.sleep(tts.longValue() + 5); // Time to recv ACK + time in queue & processing replication request
- }
- } catch (InterruptedException ignored) {
- }
- if (System.currentTimeMillis() - start > 45000) {
- throw new ClusteringFault("ACKs not received from all members within 45 sec. " +
- "Aborting wait.");
- }
- } while (!contextManager.isMessageAcknowledged(msgUUID));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org