You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by pr...@apache.org on 2008/01/15 17:22:27 UTC
svn commit: r612147 [5/17] - in /webservices/axis2/branches/java/jaxws21: ./
modules/adb-codegen/ modules/adb-codegen/src/org/apache/axis2/schema/
modules/adb-codegen/src/org/apache/axis2/schema/template/
modules/adb-codegen/src/org/apache/axis2/schema...
Modified: webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/addressing/src/org/apache/axis2/handlers/addressing/AddressingOutHandler.java Tue Jan 15 08:21:22 2008
@@ -38,6 +38,8 @@
import org.apache.axis2.addressing.i18n.AddressingMessages;
import org.apache.axis2.client.Options;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisEndpoint;
+import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.axis2.util.JavaUtils;
@@ -194,7 +196,7 @@
String messageID = messageContextOptions.getMessageId();
if (messageID != null && !isAddressingHeaderAlreadyAvailable(WSA_MESSAGE_ID, false))
{//optional
- ArrayList attributes = (ArrayList)messageContext.getProperty(
+ ArrayList attributes = (ArrayList)messageContext.getLocalProperty(
AddressingConstants.MESSAGEID_ATTRIBUTES);
createSOAPHeaderBlock(messageID, WSA_MESSAGE_ID, attributes);
}
@@ -210,27 +212,31 @@
if (action == null || action.length()==0) {
if (messageContext.getAxisOperation() != null) {
action = messageContext.getAxisOperation().getOutputAction();
+ if(action!=null){
+ // Set this action back to obviate possible action mismatch problems
+ messageContext.setWSAAction(action);
+ }
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace(messageContext.getLogIDString() +
" processWSAAction: action from AxisOperation: " + action);
}
}
- }
-
- // Use the correct fault action for the selected namespace
- if(isFinalAddressingNamespace){
- if(Submission.WSA_FAULT_ACTION.equals(action)){
- action = Final.WSA_FAULT_ACTION;
- messageContextOptions.setAction(action);
- }
}else{
- if(Final.WSA_FAULT_ACTION.equals(action)){
- action = Submission.WSA_FAULT_ACTION;
- messageContextOptions.setAction(action);
- }else if(Final.WSA_SOAP_FAULT_ACTION.equals(action)){
- action = Submission.WSA_FAULT_ACTION;
- messageContextOptions.setAction(action);
- }
+ // Use the correct fault action for the selected namespace
+ if(isFinalAddressingNamespace){
+ if(Submission.WSA_FAULT_ACTION.equals(action)){
+ action = Final.WSA_FAULT_ACTION;
+ messageContextOptions.setAction(action);
+ }
+ }else{
+ if(Final.WSA_FAULT_ACTION.equals(action)){
+ action = Submission.WSA_FAULT_ACTION;
+ messageContextOptions.setAction(action);
+ }else if(Final.WSA_SOAP_FAULT_ACTION.equals(action)){
+ action = Submission.WSA_FAULT_ACTION;
+ messageContextOptions.setAction(action);
+ }
+ }
}
// If we need to add a wsa:Action header
@@ -257,7 +263,7 @@
" processWSAAction: Adding action to header: " + action);
}
// Otherwise just add the header
- ArrayList attributes = (ArrayList)messageContext.getProperty(
+ ArrayList attributes = (ArrayList)messageContext.getLocalProperty(
AddressingConstants.ACTION_ATTRIBUTES);
createSOAPHeaderBlock(action, WSA_ACTION, attributes);
}
@@ -362,7 +368,7 @@
}
createSOAPHeaderBlock(address, WSA_TO, epr.getAddressAttributes());
}
- processToEPRReferenceInformation(epr.getAllReferenceParameters(), header);
+ processToEPRReferenceInformation(epr.getAllReferenceParameters());
}
}
@@ -432,22 +438,46 @@
* @param parent is the element to which the referenceparameters should be
* attached
*/
- private void processToEPRReferenceInformation(Map referenceInformation, OMElement parent) {
- if (referenceInformation != null && parent != null) {
+ private void processToEPRReferenceInformation(Map referenceInformation) {
+ if (referenceInformation != null) {
if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
log.trace("processToEPRReferenceInformation: " + referenceInformation);
}
Iterator iterator = referenceInformation.values().iterator();
while (iterator.hasNext()) {
OMElement omElement = (OMElement)iterator.next();
- OMElement newElement = ElementHelper.importOMElement(omElement, parent.getOMFactory());
+ OMElement newElement = ElementHelper.importOMElement(omElement, header.getOMFactory());
if (isFinalAddressingNamespace) {
newElement.addAttribute(Final.WSA_IS_REFERENCE_PARAMETER_ATTRIBUTE,
Final.WSA_TYPE_ATTRIBUTE_VALUE,
addressingNamespaceObject);
}
- parent.addChild(newElement);
+ header.addChild(newElement);
}
+ }
+ // Now add reference parameters we found in the WSDL (if any)
+ AxisService service = messageContext.getAxisService();
+ if(service != null){
+ AxisEndpoint endpoint = service.getEndpoint(service.getEndpointName());
+ if(endpoint != null){
+ ArrayList referenceparameters = (ArrayList) endpoint.getParameterValue(REFERENCE_PARAMETER_PARAMETER);
+ if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
+ log.trace("processToEPRReferenceInformation: Reference Parameters from WSDL:" + referenceparameters);
+ }
+ if(referenceparameters!=null){
+ Iterator iterator = referenceparameters.iterator();
+ while (iterator.hasNext()) {
+ OMElement omElement = (OMElement)iterator.next();
+ OMElement newElement = ElementHelper.importOMElement(omElement, header.getOMFactory());
+ if (isFinalAddressingNamespace) {
+ newElement.addAttribute(Final.WSA_IS_REFERENCE_PARAMETER_ATTRIBUTE,
+ Final.WSA_TYPE_ATTRIBUTE_VALUE,
+ addressingNamespaceObject);
+ }
+ header.addChild(newElement);
+ }
+ }
+ }
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/pom.xml Tue Jan 15 08:21:22 2008
@@ -63,5 +63,21 @@
</excludes>
</resource>
</resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <inherited>true</inherited>
+ <configuration>
+ <skip>false</skip>
+ <excludes>
+ <exclude>**/UpdateStateTest.java</exclude>
+ <exclude>**/ConfigurationManagerTest.java</exclude>
+ </excludes>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
</build>
</project>
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/ClusteringUtils.java Tue Jan 15 08:21:22 2008
@@ -25,6 +25,7 @@
import javax.activation.DataHandler;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Random;
@@ -63,6 +64,9 @@
} else {
serviceArchive = new File(axis2Repo + File.separator + "services" +
File.separator + serviceGroupName);
+ }
+ if(!serviceArchive.exists()){
+ throw new FileNotFoundException("File " + serviceArchive + " not found");
}
AxisServiceGroup asGroup =
DeploymentEngine.loadServiceGroup(serviceArchive, configCtx);
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/configuration/DefaultConfigurationManager.java Tue Jan 15 08:21:22 2008
@@ -190,7 +190,9 @@
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
- listener.setConfigurationContext(configurationContext);
+ if (listener != null) {
+ listener.setConfigurationContext(configurationContext);
+ }
}
public void addParameter(Parameter param) throws AxisFault {
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ClusteringContextListener.java Tue Jan 15 08:21:22 2008
@@ -47,7 +47,7 @@
try {
sender.sendToGroup(command);
} catch (ClusteringFault e) {
- log.error(e);
+ log.error("Cannot send context removed message to cluster", e);
}
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/ContextClusteringCommandFactory.java Tue Jan 15 08:21:22 2008
@@ -19,13 +19,13 @@
package org.apache.axis2.clustering.context;
import org.apache.axiom.om.util.UUIDGenerator;
+import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
+import org.apache.axis2.clustering.context.commands.DeleteServiceGroupContextCommand;
import org.apache.axis2.clustering.context.commands.UpdateConfigurationContextCommand;
import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
import org.apache.axis2.clustering.context.commands.UpdateServiceContextCommand;
import org.apache.axis2.clustering.context.commands.UpdateServiceGroupContextCommand;
-import org.apache.axis2.clustering.context.commands.DeleteServiceGroupContextCommand;
-import org.apache.axis2.clustering.tribes.AckManager;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.PropertyDifference;
@@ -42,15 +42,15 @@
import java.util.Map;
/**
- *
+ *
*/
public final class ContextClusteringCommandFactory {
private static final Log log = LogFactory.getLog(ContextClusteringCommandFactory.class);
public static ContextClusteringCommandCollection
- getCommandCollection(AbstractContext[] contexts,
- Map excludedReplicationPatterns) {
+ getCommandCollection(AbstractContext[] contexts,
+ Map excludedReplicationPatterns) {
ArrayList commands = new ArrayList(contexts.length);
ContextClusteringCommandCollection collection =
@@ -63,14 +63,12 @@
commands.add(cmd);
}
}
- collection.setUniqueId(UUIDGenerator.getUUID());
- AckManager.addInitialAcknowledgement(collection);
return collection;
}
/**
- * @param context
- * @param excludedPropertyPatterns
+ * @param context The context
+ * @param excludedPropertyPatterns The property patterns to be excluded
* @param includeAllProperties True - Include all properties,
* False - Include only property differences
* @return ContextClusteringCommand
@@ -79,6 +77,43 @@
Map excludedPropertyPatterns,
boolean includeAllProperties) {
+ UpdateContextCommand cmd = toUpdateContextCommand(context);
+ if (cmd != null) {
+ fillProperties(cmd,
+ context,
+ excludedPropertyPatterns,
+ includeAllProperties);
+ if (cmd.isPropertiesEmpty()) {
+ cmd = null;
+ }
+ }
+
+ synchronized (context) {
+ context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+ }
+ return cmd;
+ }
+
+
+ public static ContextClusteringCommand getUpdateCommand(AbstractContext context,
+ String[] propertyNames)
+ throws ClusteringFault {
+
+ UpdateContextCommand cmd = toUpdateContextCommand(context);
+ if (cmd != null) {
+ fillProperties(cmd, context, propertyNames);
+ if (cmd.isPropertiesEmpty()) {
+ cmd = null;
+ }
+ }
+
+ synchronized (context) {
+ context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
+ }
+ return cmd;
+ }
+
+ private static UpdateContextCommand toUpdateContextCommand(AbstractContext context) {
UpdateContextCommand cmd = null;
if (context instanceof ConfigurationContext) {
cmd = new UpdateConfigurationContextCommand();
@@ -98,29 +133,13 @@
updateServiceCmd.setServiceGroupContextId(serviceCtx.getServiceGroupContext().getId());
updateServiceCmd.setServiceName(serviceCtx.getAxisService().getName());
}
- if (cmd != null) {
- cmd.setUniqueId(UUIDGenerator.getUUID());
- fillProperties(cmd,
- context,
- excludedPropertyPatterns,
- includeAllProperties);
- if (cmd.isPropertiesEmpty()) {
- cmd = null;
- } else {
- AckManager.addInitialAcknowledgement(cmd);
- }
- }
-
- synchronized (context) {
- context.clearPropertyDifferences(); // Once we send the diffs, we should clear the diffs
- }
return cmd;
}
/**
- * @param updateCmd
- * @param context
- * @param excludedPropertyPatterns
+ * @param updateCmd The command
+ * @param context The context
+ * @param excludedPropertyPatterns The property patterns to be excluded from replication
* @param includeAllProperties True - Include all properties,
* False - Include only property differences
*/
@@ -133,18 +152,18 @@
Map diffs = context.getPropertyDifferences();
for (Iterator iter = diffs.keySet().iterator(); iter.hasNext();) {
String key = (String) iter.next();
- Object prop = context.getPropertyNonReplicable(key);
+ PropertyDifference diff = (PropertyDifference) diffs.get(key);
+ Object value = diff.getValue();
- // First check whether it is serializable
- if (prop instanceof Serializable) {
+ if (value instanceof Serializable) {
// Next check whether it matches an excluded pattern
if (!isExcluded(key,
context.getClass().getName(),
excludedPropertyPatterns)) {
- log.debug("sending property =" + key + "-" + prop);
- PropertyDifference diff = (PropertyDifference) diffs.get(key);
- diff.setValue(prop);
+ if (log.isDebugEnabled()) {
+ log.debug("sending property =" + key + "-" + value);
+ }
updateCmd.addProperty(diff);
}
}
@@ -154,14 +173,15 @@
synchronized (context) {
for (Iterator iter = context.getPropertyNames(); iter.hasNext();) {
String key = (String) iter.next();
- Object prop = context.getPropertyNonReplicable(key);
- if (prop instanceof Serializable) { // First check whether it is serializable
+ Object value = context.getPropertyNonReplicable(key);
+ if (value instanceof Serializable) {
// Next check whether it matches an excluded pattern
- if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns))
- {
- log.debug("sending property =" + key + "-" + prop);
- PropertyDifference diff = new PropertyDifference(key, prop, false);
+ if (!isExcluded(key, context.getClass().getName(), excludedPropertyPatterns)) {
+ if (log.isDebugEnabled()) {
+ log.debug("sending property =" + key + "-" + value);
+ }
+ PropertyDifference diff = new PropertyDifference(key, value, false);
updateCmd.addProperty(diff);
}
}
@@ -170,24 +190,53 @@
}
}
+ private static void fillProperties(UpdateContextCommand updateCmd,
+ AbstractContext context,
+ String[] propertyNames) throws ClusteringFault {
+ synchronized (context) {
+ Map diffs = context.getPropertyDifferences();
+ for (int i = 0; i < propertyNames.length; i++) {
+ String key = propertyNames[i];
+ Object prop = context.getPropertyNonReplicable(key);
+
+ // First check whether it is serializable
+ if (prop instanceof Serializable) {
+ if (log.isDebugEnabled()) {
+ log.debug("sending property =" + key + "-" + prop);
+ }
+ PropertyDifference diff = (PropertyDifference) diffs.get(key);
+ diff.setValue(prop);
+ updateCmd.addProperty(diff);
+ } else {
+ String msg =
+ "Trying to replicate non-serializable property " + key +
+ " in context " + context;
+ throw new ClusteringFault(msg);
+ }
+ }
+ }
+ }
+
private static boolean isExcluded(String propertyName,
String ctxClassName,
Map excludedPropertyPatterns) {
- // First check in the default excludes
- List defaultExcludes =
+ // Check in the excludes list specific to the context
+ List specificExcludes =
+ (List) excludedPropertyPatterns.get(ctxClassName);
+ boolean isExcluded = false;
+ if (specificExcludes != null) {
+ isExcluded = isExcluded(specificExcludes, propertyName);
+ }
+ if (!isExcluded) {
+ // check in the default excludes
+ List defaultExcludes =
(List) excludedPropertyPatterns.get(DeploymentConstants.TAG_DEFAULTS);
- if (defaultExcludes == null) {
- return false;
- }
- if (isExcluded(defaultExcludes, propertyName)) {
- return true;
- } else {
- // If not, check in the excludes list specific to the context
- List specificExcludes =
- (List) excludedPropertyPatterns.get(ctxClassName);
- return isExcluded(specificExcludes, propertyName);
+ if (defaultExcludes != null) {
+ isExcluded = isExcluded(defaultExcludes, propertyName);
+ }
}
+ return isExcluded;
}
private static boolean isExcluded(List list, String propertyName) {
@@ -214,11 +263,10 @@
if (abstractContext instanceof ServiceGroupContext) {
ServiceGroupContext sgCtx = (ServiceGroupContext) abstractContext;
DeleteServiceGroupContextCommand cmd = new DeleteServiceGroupContextCommand();
- cmd.setUniqueId(UUIDGenerator.getUUID());
cmd.setServiceGroupContextId(sgCtx.getId());
-
+
return cmd;
- }
+ }
return null;
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManager.java Tue Jan 15 08:21:22 2008
@@ -22,9 +22,7 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.tribes.AckManager;
import org.apache.axis2.clustering.tribes.ChannelSender;
import org.apache.axis2.context.AbstractContext;
import org.apache.axis2.context.ConfigurationContext;
@@ -32,16 +30,20 @@
import org.apache.axis2.context.ServiceGroupContext;
import org.apache.axis2.description.Parameter;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
public class DefaultContextManager implements ContextManager {
private ConfigurationContext configContext;
+ private ContextManagerListener listener;
private Map parameters = new HashMap();
private ChannelSender sender;
- private ContextReplicationProcessor processor = new ContextReplicationProcessor();
private Map excludedReplicationPatterns = new HashMap();
@@ -53,30 +55,35 @@
public DefaultContextManager() {
}
- public String updateContext(AbstractContext context) throws ClusteringFault {
+ public void updateContext(AbstractContext context) throws ClusteringFault {
ContextClusteringCommand cmd =
ContextClusteringCommandFactory.getUpdateCommand(context,
excludedReplicationPatterns,
false);
if (cmd != null) {
- processor.process(cmd);
- return cmd.getUniqueId();
+ sender.sendToGroup(cmd);
}
- return null;
}
- public String updateContexts(AbstractContext[] contexts) throws ClusteringFault {
+ public void updateContext(AbstractContext context,
+ String[] propertyNames) throws ClusteringFault {
+ ContextClusteringCommand cmd =
+ ContextClusteringCommandFactory.getUpdateCommand(context, propertyNames);
+ if (cmd != null) {
+ sender.sendToGroup(cmd);
+ }
+ }
+
+ public void updateContexts(AbstractContext[] contexts) throws ClusteringFault {
ContextClusteringCommandCollection cmd =
ContextClusteringCommandFactory.getCommandCollection(contexts,
excludedReplicationPatterns);
- processor.process(cmd);
- return cmd.getUniqueId();
+ sender.sendToGroup(cmd);
}
- public String removeContext(AbstractContext context) throws ClusteringFault {
+ public void removeContext(AbstractContext context) throws ClusteringFault {
ContextClusteringCommand cmd = ContextClusteringCommandFactory.getRemoveCommand(context);
- processor.process(cmd);
- return cmd.getUniqueId();
+ sender.sendToGroup(cmd);
}
public boolean isContextClusterable(AbstractContext context) {
@@ -85,22 +92,18 @@
(context instanceof ServiceGroupContext);
}
- public boolean isMessageAcknowledged(String messageUniqueId) throws ClusteringFault {
- return AckManager.isMessageAcknowledged(messageUniqueId, sender);
- }
-
- public void process(ContextClusteringCommand command) throws ClusteringFault {
- command.execute(configContext);
- }
-
public void setContextManagerListener(ContextManagerListener listener) {
+ this.listener = listener;
if (configContext != null) {
- listener.setConfigurationContext(configContext);
+ this.listener.setConfigurationContext(configContext);
}
}
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configContext = configurationContext;
+ if (listener != null) {
+ listener.setConfigurationContext(configContext);
+ }
}
public void setReplicationExcludePatterns(String contextType, List patterns) {
@@ -140,43 +143,4 @@
throw new UnsupportedOperationException();
}
// ---------------------------------------------------------------------------------------------
-
- private class ContextReplicationProcessor {
- public void process(final ContextClusteringCommand cmd) throws ClusteringFault {
-
- // If the sender is NULL, it means the TribesClusterManager is still being initialized
- // So we need to busy wait.
- if (sender == null) {
- Thread processorThread = new Thread("ProcessorThread") {
- public void run() {
- do {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } while (sender == null);
- try {
- long tts = sender.sendToGroup(cmd);
- configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
- new Long(tts));
- } catch (ClusteringFault clusteringFault) {
- AckManager.removeMessage(cmd.getUniqueId());
- throw new RuntimeException(clusteringFault);
- }
- }
- };
- processorThread.start();
- } else {
- try {
- long tts = sender.sendToGroup(cmd);
- configContext.setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
- new Long(tts));
- } catch (ClusteringFault clusteringFault) {
- AckManager.removeMessage(cmd.getUniqueId());
- throw clusteringFault;
- }
- }
- }
- }
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/DefaultContextManagerListener.java Tue Jan 15 08:21:22 2008
@@ -27,12 +27,9 @@
public class DefaultContextManagerListener implements ContextManagerListener {
private ConfigurationContext configurationContext;
- private static final Log log = LogFactory.getLog(DefaultContextManagerListener.class);
public void contextUpdated(ContextClusteringCommand message) throws ClusteringFault {
- log.debug("Enter: DefaultContextManagerListener::contextRemoved");
message.execute(configurationContext);
- log.debug("Exit: DefaultContextManagerListener::contextRemoved");
}
public void setConfigurationContext(ConfigurationContext configurationContext) {
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/PropertyUpdater.java Tue Jan 15 08:21:22 2008
@@ -36,7 +36,9 @@
private Map properties;
public void updateProperties(AbstractContext abstractContext) {
- log.debug("Updating props in " + abstractContext);
+ if (log.isDebugEnabled()) {
+ log.debug("Updating props in " + abstractContext);
+ }
if (abstractContext != null) {
for (Iterator iter = properties.keySet().iterator(); iter.hasNext();) {
String key = (String) iter.next();
@@ -46,8 +48,10 @@
abstractContext.removePropertyNonReplicable(key);
} else { // it is updated/added
abstractContext.setNonReplicableProperty(key, propDiff.getValue());
- log.debug("Added prop=" + key + ", value=" + propDiff.getValue() +
- " to context " + abstractContext);
+ if (log.isDebugEnabled()) {
+ log.debug("Added prop=" + key + ", value=" + propDiff.getValue() +
+ " to context " + abstractContext);
+ }
}
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/ContextClusteringCommandCollection.java Tue Jan 15 08:21:22 2008
@@ -45,6 +45,6 @@
}
public String toString() {
- return "ContextClusteringCommandCollection(" + uniqueId + ")";
+ return "ContextClusteringCommandCollection";
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateConfigurationContextCommand.java Tue Jan 15 08:21:22 2008
@@ -31,6 +31,6 @@
}
public String toString() {
- return "UpdateConfigurationContextCommand(" + uniqueId + ")";
+ return "UpdateConfigurationContextCommand";
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateContextCommand.java Tue Jan 15 08:21:22 2008
@@ -42,7 +42,7 @@
public void addProperty(PropertyDifference diff) {
if (propertyUpdater.getProperties() == null) {
propertyUpdater.setProperties(new HashMap());
- }
+ }
propertyUpdater.addContextProperty(diff);
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceContextCommand.java Tue Jan 15 08:21:22 2008
@@ -53,7 +53,9 @@
}
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
- log.debug("Updating service context properties...");
+ if (log.isDebugEnabled()) {
+ log.debug("Updating service context properties...");
+ }
ServiceGroupContext sgCtx =
configurationContext.getServiceGroupContext(serviceGroupContextId);
if (sgCtx != null) {
@@ -100,6 +102,6 @@
}
public String toString() {
- return "UpdateServiceContextCommand(" + uniqueId + ")";
+ return "UpdateServiceContextCommand";
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/context/commands/UpdateServiceGroupContextCommand.java Tue Jan 15 08:21:22 2008
@@ -66,11 +66,13 @@
sgCtx.setId(serviceGroupContextId);
configContext.addServiceGroupContextIntoSoapSessionTable(sgCtx); // TODO: Check this
}
- log.debug("###### Gonna update SG prop in " + serviceGroupContextId + "===" + sgCtx);
+ if (log.isDebugEnabled()) {
+ log.debug("Gonna update SG prop in " + serviceGroupContextId + "===" + sgCtx);
+ }
propertyUpdater.updateProperties(sgCtx);
}
public String toString() {
- return "UpdateServiceGroupContextCommand(" + uniqueId + ")";
+ return "UpdateServiceGroupContextCommand";
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationCommand.java Tue Jan 15 08:21:22 2008
@@ -17,6 +17,8 @@
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisServiceGroup;
import org.apache.axis2.engine.AxisConfiguration;
@@ -37,7 +39,20 @@
AxisConfiguration axisConfig = configCtx.getAxisConfiguration();
for (Iterator iter = axisConfig.getServiceGroups(); iter.hasNext();) {
AxisServiceGroup serviceGroup = (AxisServiceGroup) iter.next();
- serviceGroupNames.add(serviceGroup.getServiceGroupName());
+ boolean excludeSG = false;
+ for (Iterator serviceIter = serviceGroup.getServices(); serviceIter.hasNext();) {
+ AxisService service = (AxisService) serviceIter.next();
+ if (service.getParameter(AxisModule.MODULE_SERVICE) != null ||
+ service.isClientSide()) { // No need to send services deployed through modules or client side services
+ excludeSG = true;
+ break;
+ }
+ }
+
+ //TODO: Exclude all services loaded from modules. How to handle data services etc.?
+ if (!excludeSG) {
+ serviceGroupNames.add(serviceGroup.getServiceGroupName());
+ }
}
this.serviceGroupNames =
(String[]) serviceGroupNames.toArray(new String[serviceGroupNames.size()]);
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java Tue Jan 15 08:21:22 2008
@@ -24,6 +24,8 @@
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisModule;
import org.apache.axis2.AxisFault;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.FileNotFoundException;
import java.util.Iterator;
@@ -33,6 +35,8 @@
*/
public class GetConfigurationResponseCommand extends ControlCommand {
+ private static final Log log = LogFactory.getLog(GetConfigurationResponseCommand.class);
+
private String[] serviceGroups;
public void execute(ConfigurationContext configContext) throws ClusteringFault {
@@ -40,7 +44,10 @@
// Run this code only if this node is not already initialized
if (configContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ getPropertyNonReplicable(ClusteringConstants.RECD_CONFIG_INIT_MSG) == null) {
+ log.info("Received configuration initialization message");
+ configContext.
+ setNonReplicableProperty(ClusteringConstants.RECD_CONFIG_INIT_MSG, "true");
if (serviceGroups != null) {
// Load all the service groups that are sent by the neighbour
@@ -59,6 +66,8 @@
}
}
+ //TODO: We support only AAR files for now
+
// Unload all service groups which were not sent by the neighbour,
// but have been currently loaded
for (Iterator iter = axisConfig.getServiceGroups(); iter.hasNext();) {
@@ -79,9 +88,9 @@
serviceIter.hasNext();) {
AxisService service = (AxisService) serviceIter.next();
if (service.isClientSide() ||
- service.getParameter(AxisModule.MODULE_SERVICE) != null) {
+ service.getParameter(AxisModule.MODULE_SERVICE) != null) { // Do not unload service groups containing client side services or ones deployed from within modules
mustUnloadServiceGroup = false;
- break; // Do not unload service groups containing client side services
+ break;
}
}
if (mustUnloadServiceGroup) {
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Tue Jan 15 08:21:22 2008
@@ -22,24 +22,30 @@
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
*
*/
public class GetStateResponseCommand extends ControlCommand {
- private ContextClusteringCommand[] commands;
+ private static final Log log = LogFactory.getLog(GetStateResponseCommand.class);
- public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ private ContextClusteringCommand[] commands;
+ public void execute(ConfigurationContext configContext) throws ClusteringFault {
+ log.info("Received state initialization message");
+
// Run this code only if this node is not already initialized
- if (configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
- configurationContext.
- setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
+ if (configContext.
+ getPropertyNonReplicable(ClusteringConstants.RECD_STATE_INIT_MSG) == null) {
+ configContext.
+ setNonReplicableProperty(ClusteringConstants.RECD_STATE_INIT_MSG, "true");
+// log.info("Received state initialization message");
if (commands != null) {
for (int i = 0; i < commands.length; i++) {
- commands[i].execute(configurationContext);
+ commands[i].execute(configContext);
}
}
}
Copied: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (from r611653, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?p2=webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java&r1=611653&r2=612147&rev=612147&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Tue Jan 15 08:21:22 2008
@@ -15,45 +15,21 @@
*/
package org.apache.axis2.clustering.tribes;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.RemoteProcessException;
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ByteMessage;
-import org.apache.catalina.tribes.util.UUIDGenerator;
-import org.apache.catalina.tribes.io.ChannelData;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.Membership;
-import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.axis2.description.AxisServiceGroup;
-import org.apache.axis2.description.AxisModule;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TimerTask;
-import java.util.Timer;
import java.util.Iterator;
import java.util.List;
-import java.util.ArrayList;
-import java.net.Socket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
-import java.net.ConnectException;
-import java.io.Serializable;
+import java.util.Map;
/**
* Message intereceptor for handling at-most-once message processing semantics
*/
-public class AtMostOnceInterceptor extends ChannelInterceptorBase {
+public class AtMostOnceInterceptor extends ChannelInterceptorBase {
private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class);
private static final Map receivedMessages = new HashMap();
@@ -61,39 +37,61 @@
/**
* The time a message lives in the receivedMessages Map
*/
- private static final int TIMEOUT = 60 * 1000;
+ private static final int TIMEOUT = 5 * 60 * 1000;
public AtMostOnceInterceptor() {
+ Thread cleanupThread = new Thread(new MessageCleanupTask());
+ cleanupThread.setPriority(Thread.MIN_PRIORITY);
+ cleanupThread.start();
+ }
- TimerTask cleanupTask = new TimerTask() {
- public void run() {
- List toBeRemoved = new ArrayList();
- for (Iterator iterator = receivedMessages.keySet().iterator();
- iterator.hasNext();) {
- ChannelMessage msg = (ChannelMessage) iterator.next();
- long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
- if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
- toBeRemoved.add(msg);
- }
+ public void messageReceived(ChannelMessage msg) {
+ synchronized (receivedMessages) {
+ if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
+ receivedMessages.put(msg, new Long(System.currentTimeMillis()));
+ super.messageReceived(msg);
+ } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
+ log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
+ }
+ }
+ }
+
+ private class MessageCleanupTask implements Runnable {
+
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(TIMEOUT);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
- ChannelMessage msg = (ChannelMessage) iterator.next();
- receivedMessages.remove(msg);
- if (log.isDebugEnabled()) {
- log.debug("Cleaned up message ");
+ try {
+ List toBeRemoved = new ArrayList();
+ Thread.yield();
+ synchronized (receivedMessages) {
+ for (Iterator iterator = receivedMessages.keySet().iterator();
+ iterator.hasNext();) {
+ ChannelMessage msg = (ChannelMessage) iterator.next();
+ long arrivalTime = ((Long) receivedMessages.get(msg)).longValue();
+ if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
+ toBeRemoved.add(msg);
+ if(toBeRemoved.size() > 10000){ // Do not allow this thread to run for too long
+ break;
+ }
+ }
+ }
+ for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
+ ChannelMessage msg = (ChannelMessage) iterator.next();
+ receivedMessages.remove(msg);
+ if (log.isDebugEnabled()) {
+ log.debug("Cleaned up message ");
+ }
+ }
}
+ } catch (Exception e) {
+ log.error("Exception occurred while trying to cleanup messages", e);
}
}
- };
- new Timer().scheduleAtFixedRate(cleanupTask, TIMEOUT, TIMEOUT);
- }
-
- public void messageReceived(ChannelMessage msg) {
- super.messageReceived(msg);
- if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
- receivedMessages.put(msg, new Long(System.currentTimeMillis()));
- } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
- log.info("Duplicate message received from " + TribesUtil.getHost(msg.getAddress()));
}
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Tue Jan 15 08:21:22 2008
@@ -25,9 +25,6 @@
import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
import org.apache.axis2.clustering.context.DefaultContextManager;
-import org.apache.axis2.clustering.context.commands.ContextClusteringCommandCollection;
-import org.apache.axis2.clustering.context.commands.UpdateContextCommand;
-import org.apache.axis2.clustering.control.AckCommand;
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
import org.apache.axis2.clustering.control.GetStateResponseCommand;
@@ -37,6 +34,8 @@
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcMessage;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,20 +50,17 @@
private DefaultContextManager contextManager;
private DefaultConfigurationManager configurationManager;
- private TribesControlCommandProcessor controlCommandProcessor;
- private ChannelSender channelSender;
+ private ControlCommandProcessor controlCommandProcessor;
private ConfigurationContext configurationContext;
public ChannelListener(ConfigurationContext configurationContext,
DefaultConfigurationManager configurationManager,
DefaultContextManager contextManager,
- TribesControlCommandProcessor controlCommandProcessor,
- ChannelSender sender) {
+ ControlCommandProcessor controlCommandProcessor) {
this.configurationManager = configurationManager;
this.contextManager = contextManager;
this.controlCommandProcessor = controlCommandProcessor;
- this.channelSender = sender;
this.configurationContext = configurationContext;
}
@@ -80,10 +76,21 @@
this.configurationContext = configurationContext;
}
+ /**
+ * Invoked by the channel to determine if the listener will process this message or not.
+ * @param msg Serializable
+ * @param sender Member
+ * @return boolean
+ */
public boolean accept(Serializable msg, Member sender) {
- return true;
+ return !(msg instanceof RpcMessage); // RpcMessages will not be handled by this listener
}
+ /**
+ * Receive a message from the channel
+ * @param msg Serializable
+ * @param sender - the source of the message
+ */
public void messageReceived(Serializable msg, Member sender) {
try {
AxisConfiguration configuration = configurationContext.getAxisConfiguration();
@@ -98,58 +105,45 @@
AxisModule module = (AxisModule) iter.next();
classLoaders.add(module.getModuleClassLoader());
}
-
-
byte[] message = ((ByteMessage) msg).getMessage();
msg = XByteBuffer.deserialize(message,
0,
message.length,
- (ClassLoader[])classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+ (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
} catch (Exception e) {
- log.error(e);
+ String errMsg = "Cannot deserialize received message";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
}
// If the system has not still been intialized, reject all incoming messages, except the
// GetStateResponseCommand message
if (configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
- && !(msg instanceof GetStateResponseCommand) &&
- !(msg instanceof GetConfigurationResponseCommand)) {
-
- log.warn("Received message before cluster initialization has been completed");
+ getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ log.warn("Received message " + msg +
+ " before cluster initialization has been completed from " +
+ TribesUtil.getHost(sender));
return;
}
- log.debug("Received message " + msg + " from " + TribesUtil.getHost(sender));
+ if (log.isDebugEnabled()) {
+ log.debug("Received message " + msg + " from " + TribesUtil.getHost(sender));
+ }
+
try {
processMessage(msg, sender);
} catch (Exception e) {
- log.error(e);
+ String errMsg = "Cannot process received message";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
}
}
private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
- //TODO: Reject duplicates that can be received due to retransmissions
- //TODO: ACK implosion?
-
if (msg instanceof ContextClusteringCommand && contextManager != null) {
ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
- contextManager.process(ctxCmd);
-
- // Sending ACKs for ContextClusteringCommandCollection or
- // UpdateContextCommand is sufficient
- if (msg instanceof ContextClusteringCommandCollection ||
- msg instanceof UpdateContextCommand) {
- AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
-
- // Send the ACK
- this.channelSender.sendToMember(ackCmd, sender);
- }
- } else if (msg instanceof ConfigurationClusteringCommand &&
- configurationManager != null) {
+ ctxCmd.execute(configurationContext);
+ } else if (msg instanceof ConfigurationClusteringCommand && configurationManager != null) {
configurationManager.process((ConfigurationClusteringCommand) msg);
- } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
- controlCommandProcessor.process((ControlCommand) msg,
- sender);
- }
+ }
}
}
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Tue Jan 15 08:21:22 2008
@@ -24,6 +24,7 @@
import org.apache.axis2.clustering.MessageSender;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.Member;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,39 +38,58 @@
private Log log = LogFactory.getLog(ChannelSender.class);
private Channel channel;
+ private boolean synchronizeAllMembers;
+ private MembershipManager membershipManager;
- public long sendToGroup(ClusteringCommand msg) throws ClusteringFault {
+ public ChannelSender(Channel channel,
+ MembershipManager membershipManager,
+ boolean synchronizeAllMembers) {
+ this.channel = channel;
+ this.membershipManager = membershipManager;
+ this.synchronizeAllMembers = synchronizeAllMembers;
+ }
+
+ public void sendToGroup(ClusteringCommand msg) throws ClusteringFault {
if (channel == null) {
- return 0;
+ return;
}
- long timeToSend = 0;
+ Member[] members = membershipManager.getMembers();
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
// causing a view change. All nodes in a view should get the msg
- //TODO: Sometimes Tribes incorrectly detects that a member has left a group
- while (true) {
- if (channel.getMembers().length > 0) {
- try {
- long start = System.currentTimeMillis();
- channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
- timeToSend = System.currentTimeMillis() - start;
+ if (members.length > 0) {
+ try {
+ if (synchronizeAllMembers) {
+ channel.send(members, toByteMessage(msg),
+ Channel.SEND_OPTIONS_USE_ACK |
+ Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+ TribesClusterManager.MSG_ORDER_OPTION);
+ } else {
+ channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_ASYNCHRONOUS);
+ }
+ if (log.isDebugEnabled()) {
log.debug("Sent " + msg + " to group");
- break;
- } catch (NotSerializableException e) {
- String message = "Could not send command message " + msg +
- " to group since it is not serializable.";
- log.error(message, e);
- throw new ClusteringFault(message, e);
- } catch (Exception e) {
- String message = "Error sending command message : " + msg +
- ". Reason " + e.getMessage();
- log.warn(message, e);
}
- } else {
- break;
+ } catch (NotSerializableException e) {
+ String message = "Could not send command message " + msg +
+ " to group since it is not serializable.";
+ log.error(message, e);
+ throw new ClusteringFault(message, e);
+ } catch (ChannelException e) {
+ log.error("Could not send message to some members", e);
+ ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+ for (int i = 0; i < faultyMembers.length; i++) {
+ ChannelException.FaultyMember faultyMember = faultyMembers[i];
+ Member member = faultyMember.getMember();
+ log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+ faultyMember.getCause());
+ }
+ } catch (Exception e) {
+ String message = "Error sending command message : " + msg +
+ ". Reason " + e.getMessage();
+ log.warn(message, e);
}
}
- return timeToSend;
}
private ByteMessage toByteMessage(ClusteringCommand msg) throws IOException {
@@ -89,39 +109,37 @@
channel.send(new Member[]{channel.getLocalMember(true)},
toByteMessage(msg),
Channel.SEND_OPTIONS_USE_ACK);
- log.debug("Sent " + msg + " to self");
+ if (log.isDebugEnabled()) {
+ log.debug("Sent " + msg + " to self");
+ }
} catch (Exception e) {
throw new ClusteringFault(e);
}
}
- public long sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
- long timeToSend = 0;
+ public void sendToMember(ClusteringCommand cmd, Member member) throws ClusteringFault {
try {
if (member.isReady()) {
- long start = System.currentTimeMillis();
- channel.send(new Member[]{member}, toByteMessage(cmd), Channel.SEND_OPTIONS_USE_ACK);
- timeToSend = System.currentTimeMillis() - start;
- log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+ channel.send(new Member[]{member}, toByteMessage(cmd),
+ Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ if (log.isDebugEnabled()) {
+ log.debug("Sent " + cmd + " to " + TribesUtil.getHost(member));
+ }
}
} catch (NotSerializableException e) {
- String message = "Could not send command message to " + TribesUtil.getHost(member) +
+ String message = "Could not send command message to " + TribesUtil.getHost(member) +
" since it is not serializable.";
log.error(message, e);
throw new ClusteringFault(message, e);
+ } catch (ChannelException e) {
+ log.error("Could not send message to " + TribesUtil.getHost(member));
+ ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+ log.error("Member " + TribesUtil.getHost(member) + " is faulty",
+ faultyMembers[0].getCause());
} catch (Exception e) {
String message = "Could not send message to " + TribesUtil.getHost(member) +
". Reason " + e.getMessage();
log.warn(message, e);
}
- return timeToSend;
- }
-
- public Channel getChannel() {
- return channel;
- }
-
- public void setChannel(Channel channel) {
- this.channel = channel;
}
}
Copied: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (from r610664, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?p2=webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java&r1=610664&r2=612147&rev=612147&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Tue Jan 15 08:21:22 2008
@@ -25,21 +25,21 @@
* Responsible for managing the membership
*/
public class MembershipManager {
- private static final List members = new ArrayList();
+ private final List members = new ArrayList();
- public synchronized static void memberAdded(Member member) {
+ public synchronized void memberAdded(Member member) {
members.add(member);
}
- public synchronized static void memberDisappeared(Member member) {
+ public synchronized void memberDisappeared(Member member) {
members.remove(member);
}
- public synchronized static Member[] getMembers() {
+ public synchronized Member[] getMembers() {
return (Member[]) members.toArray(new Member[members.size()]);
}
- public synchronized static Member getLongestLivingMember() {
+ public synchronized Member getLongestLivingMember() {
Member longestLivingMember = null;
if (members.size() > 0) {
Member member0 = (Member) members.get(0);
@@ -56,8 +56,8 @@
return longestLivingMember;
}
- public synchronized static Member getRandomMember() {
- if(members.size() == 0){
+ public synchronized Member getRandomMember() {
+ if (members.size() == 0) {
return null;
}
int memberIndex = new Random().nextInt(members.size());
Modified: webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=612147&r1=612146&r2=612147&view=diff
==============================================================================
--- webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/branches/java/jaxws21/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Tue Jan 15 08:21:22 2008
@@ -22,7 +22,6 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusterManager;
-import org.apache.axis2.clustering.ClusteringCommand;
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.RequestBlockingHandler;
@@ -31,6 +30,7 @@
import org.apache.axis2.clustering.context.ClusteringContextListener;
import org.apache.axis2.clustering.context.ContextManager;
import org.apache.axis2.clustering.context.DefaultContextManager;
+import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.context.ConfigurationContext;
@@ -45,8 +45,14 @@
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.catalina.tribes.transport.ReplicationTransmitter;
+import org.apache.catalina.tribes.transport.MultiPointSender;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,9 +60,9 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Random;
public class TribesClusterManager implements ClusterManager {
+ public static final int MSG_ORDER_OPTION = 512;
private static final Log log = LogFactory.getLog(TribesClusterManager.class);
private DefaultConfigurationManager configurationManager;
@@ -64,13 +70,16 @@
private HashMap parameters;
private ManagedChannel channel;
+ private RpcChannel rpcChannel;
private ConfigurationContext configurationContext;
- private TribesControlCommandProcessor controlCmdProcessor;
+ private ControlCommandProcessor controlCmdProcessor;
private ChannelListener channelListener;
+ private ChannelSender channelSender;
+ private MembershipManager membershipManager;
public TribesClusterManager() {
parameters = new HashMap();
- controlCmdProcessor = new TribesControlCommandProcessor(configurationContext);
+ controlCmdProcessor = new ControlCommandProcessor(configurationContext);
}
public ContextManager getContextManager() {
@@ -83,8 +92,6 @@
public void init() throws ClusteringFault {
- // Until the clustering stuff is properly initialized, we have to block.
- configurationContext.setProperty(ClusteringConstants.BLOCK_ALL_REQUESTS, "true");
AxisConfiguration axisConfig = configurationContext.getAxisConfiguration();
for (Iterator iterator = axisConfig.getInFlowPhases().iterator();
iterator.hasNext();) {
@@ -122,19 +129,25 @@
}
}
}
-
- ChannelSender sender = new ChannelSender();
-
- channelListener = new ChannelListener(configurationContext,
- configurationManager,
- contextManager,
- controlCmdProcessor,
- sender);
-
- controlCmdProcessor.setChannelSender(sender);
+ membershipManager = new MembershipManager();
channel = new GroupChannel();
+ channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers());
+ channelListener = new ChannelListener(configurationContext, configurationManager,
+ contextManager, controlCmdProcessor);
+
+ // Set the maximum number of retries, if message sending to a particular node fails
+ Parameter maxRetriesParam = getParameter("maxRetries");
+ int maxRetries = 10;
+ if (maxRetriesParam != null) {
+ maxRetries = Integer.parseInt((String) maxRetriesParam.getValue());
+ }
+ ReplicationTransmitter replicationTransmitter =
+ (ReplicationTransmitter) channel.getChannelSender();
+ MultiPointSender multiPointSender = replicationTransmitter.getTransport();
+ multiPointSender.setMaxRetryAttempts(maxRetries);
- String localIP = System.getProperty("local.ip.address"); //TODO: Use ClusteringConstants.LOCAL_IP_ADDRESS
+ // Set the IP address that will be advertised by this node
+ String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
if (localIP != null) {
ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
receiver.setAddress(localIP);
@@ -148,6 +161,8 @@
} else {
domain = "apache.axis2.domain".getBytes();
}
+
+ // Add a DomainFilterInterceptor
channel.getMembershipService().setDomain(domain);
DomainFilterInterceptor dfi = new DomainFilterInterceptor();
dfi.setDomain(domain);
@@ -174,13 +189,23 @@
mcastProps.setProperty("tcpListenPort", "4000");
mcastProps.setProperty("tcpListenHost", "127.0.0.1");*/
- /*TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
- tcpFailureDetector.setPrevious(nbc);
+ // Add the OrderInterceptor to preserve sender ordering
+ OrderInterceptor orderInterceptor = new OrderInterceptor();
+ orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
+ channel.addInterceptor(orderInterceptor);
+
+ // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+ channel.addInterceptor(atMostOnceInterceptor);
+
+ // Add a reliable failure detector
+ TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+ tcpFailureDetector.setPrevious(dfi);
channel.addInterceptor(tcpFailureDetector);
- tcpFailureDetector.*/
channel.addChannelListener(channelListener);
- TribesMembershipListener membershipListener = new TribesMembershipListener();
+
+ TribesMembershipListener membershipListener = new TribesMembershipListener(membershipManager);
channel.addMembershipListener(membershipListener);
try {
channel.start(Channel.DEFAULT);
@@ -189,85 +214,106 @@
channel.stop(Channel.DEFAULT);
throw new ClusteringFault("Cannot join cluster using IP " + localHost +
". Please set an IP address other than " +
- localHost + " in your /etc/hosts file and retry.");
+ localHost + " in your /etc/hosts file or set the " +
+ ClusteringConstants.LOCAL_IP_ADDRESS +
+ " System property and retry.");
}
} catch (ChannelException e) {
throw new ClusteringFault("Error starting Tribes channel", e);
}
- sender.setChannel(channel);
-
- Member[] members = channel.getMembers();
- log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
- TribesUtil.printMembers(members);
- if (configurationManager != null) { // If configuration management is enabled, get the latest config from a neighbour
- configurationManager.setSender(sender);
- getInitializationMessage(members, sender, new GetConfigurationCommand());
- }
-
- if (contextManager != null) { // If context replication is enabled, get the latest state from a neighbour
- contextManager.setSender(sender);
+ // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
+ // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+ RpcChannel rpcChannel =
+ new RpcChannel(domain, channel,
+ new InitializationRequestHandler(controlCmdProcessor));
+
+ log.info("Local Member " + TribesUtil.getLocalHost(channel));
+ TribesUtil.printMembers(membershipManager);
+
+ // If configuration management is enabled, get the latest config from a neighbour
+ if (configurationManager != null) {
+ configurationManager.setSender(channelSender);
+ initializeSystem(rpcChannel, new GetConfigurationCommand());
+ }
+
+ // If context replication is enabled, get the latest state from a neighbour
+ if (contextManager != null) {
+ contextManager.setSender(channelSender);
channelListener.setContextManager(contextManager);
- getInitializationMessage(members, sender, new GetStateCommand());
- ClusteringContextListener contextListener = new ClusteringContextListener(sender);
+ initializeSystem(rpcChannel, new GetStateCommand());
+ ClusteringContextListener contextListener = new ClusteringContextListener(channelSender);
configurationContext.addContextListener(contextListener);
}
+
configurationContext.
setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
- configurationContext.removeProperty(ClusteringConstants.BLOCK_ALL_REQUESTS);
}
/**
* Get some information from a neighbour. This information will be used by this node to
* initialize itself
*
- * @param members
- * @param sender
- * @param command
+ * @param rpcChannel The utility for sending RPC style messages to the channel
+ * @param command The control command to send
+ * @throws ClusteringFault If initialization code failed on this node
*/
- private void getInitializationMessage(Member[] members,
- ChannelSender sender,
- ClusteringCommand command) {
- // If there is at least one member in the Tribe, get the current initialization info from a member
- Random random = new Random();
- int numberOfTries = 0; // Don't keep on trying infinitely
+ private void initializeSystem(RpcChannel rpcChannel, ControlCommand command)
+ throws ClusteringFault {
+ // If there is at least one member in the cluster,
+ // get the current initialization info from a member
+ int numberOfTries = 0; // Don't keep on trying indefinitely
// Keep track of members to whom we already sent an initialization command
// Do not send another request to these members
List sentMembersList = new ArrayList();
- while (members.length > 0 &&
- configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
- && numberOfTries < 50) {
-
- // While there are members and GetStateResponseCommand is not received do the following
+ sentMembersList.add(TribesUtil.getLocalHost(channel));
+ Member[] members = membershipManager.getMembers();
+ if(members.length == 0) return;
+
+ while (members.length > 0 && numberOfTries < 5) {
+ Member member = (numberOfTries == 0) ?
+ membershipManager.getLongestLivingMember() : // First try to get from the longest member alive
+ membershipManager.getRandomMember(); // Else get from a random member
+ String memberHost = TribesUtil.getHost(member);
try {
- members = channel.getMembers();
- int memberIndex = random.nextInt(members.length);
- Member member = members[memberIndex];
- if (!sentMembersList.contains(TribesUtil.getHost(member))) {
- long tts = sender.sendToMember(command, member);
- configurationContext.
- setNonReplicableProperty(ClusteringConstants.TIME_TO_SEND,
- new Long(tts));
- sentMembersList.add(TribesUtil.getHost(member));
- log.debug("WAITING FOR STATE INITIALIZATION MESSAGE...");
- Thread.sleep(tts + 5);
+ if (!sentMembersList.contains(memberHost)) {
+ Response[] responses = rpcChannel.send(new Member[]{member},
+ command,
+ RpcChannel.FIRST_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ if (responses.length > 0) {
+ ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+ break;
+ }
+ }
+ } catch (ChannelException e) {
+ log.error("Cannot get initialization information from " +
+ memberHost + ". Will retry in 2 secs.", e);
+ sentMembersList.add(memberHost);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
+ log.debug("Interrupted", ignored);
}
- } catch (Exception e) {
- log.error(e);
- break;
}
numberOfTries++;
+ members = membershipManager.getMembers();
+ if(numberOfTries >= members.length){
+ break;
+ }
}
}
public void setConfigurationManager(ConfigurationManager configurationManager) {
this.configurationManager = (DefaultConfigurationManager) configurationManager;
+ this.configurationManager.setSender(channelSender);
}
public void setContextManager(ContextManager contextManager) {
this.contextManager = (DefaultContextManager) contextManager;
+ this.contextManager.setSender(channelSender);
}
public void addParameter(Parameter param) throws AxisFault {
@@ -291,13 +337,8 @@
}
public boolean isParameterLocked(String parameterName) {
-
Parameter parameter = (Parameter) parameters.get(parameterName);
- if (parameter != null) {
- return parameter.isLocked();
- }
-
- return false;
+ return parameter != null && parameter.isLocked();
}
public void removeParameter(Parameter param) throws AxisFault {
@@ -308,6 +349,8 @@
log.debug("Enter: TribesClusterManager::shutdown");
if (channel != null) {
try {
+ channel.removeChannelListener(rpcChannel);
+ channel.removeChannelListener(channelListener);
channel.stop(Channel.DEFAULT);
} catch (ChannelException e) {
@@ -327,5 +370,24 @@
if (channelListener != null) {
channelListener.setConfigurationContext(configurationContext);
}
+ if (configurationManager != null) {
+ configurationManager.setConfigurationContext(configurationContext);
+ }
+ if (contextManager != null) {
+ contextManager.setConfigurationContext(configurationContext);
+ }
+ }
+
+ /**
+ * Method to check whether all members in the cluster have to be kep in sync at all times.
+ * Typically, this will require each member in the cluster to ACKnowledge receipt of a
+ * particular message, which may have a significant performance hit.
+ *
+ * @return true - if all members in the cluster should be kept in sync at all times, false
+ * otherwise
+ */
+ public boolean synchronizeAllMembers() {
+ Parameter syncAllParam = getParameter(ClusteringConstants.SYNCHRONIZE_ALL_MEMBERS);
+ return syncAllParam == null || Boolean.parseBoolean((String) syncAllParam.getValue());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org