You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/21 19:35:59 UTC
svn commit: r658777 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering:
control/wka/ tribes/
Author: azeez
Date: Wed May 21 10:35:58 2008
New Revision: 658777
URL: http://svn.apache.org/viewvc?rev=658777&view=rev
Log:
Initial stuff on WKA based membership discovery
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.context.ConfigurationContext;
+
+/**
+ *
+ */
+public class JoinGroupCommand extends ControlCommand {
+ public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ //TODO: Method implementation
+ System.out.println("### EXEC JoinGroupCommand");
+
+ //todo: send member list, add to static member list, send member joined to others
+ }
+}
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.catalina.tribes.io.ChannelData;
+
+/**
+ *
+ */
+public class LeaveGroupCommand extends ChannelData {
+}
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.catalina.tribes.io.ChannelData;
+
+/**
+ *
+ */
+public class MemberJoinedCommand extends ChannelData {
+}
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=658777&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Wed May 21 10:35:58 2008
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.control.wka;
+
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.context.ConfigurationContext;
+
+/**
+ *
+ */
+public class MemberListCommand extends ControlCommand {
+ public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ //TODO: Method implementation
+ System.out.println("#### MEMBER LIST CMD");
+ }
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java Wed May 21 10:35:58 2008
@@ -26,10 +26,12 @@
import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.clustering.control.GetStateResponseCommand;
+import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
import org.apache.axis2.context.ConfigurationContext;
/**
- *
+ *
*/
public class ControlCommandProcessor {
private ConfigurationContext configurationContext;
@@ -70,6 +72,10 @@
getConfigRespCmd.
setServiceGroups(((GetConfigurationCommand) command).getServiceGroupNames());
return getConfigRespCmd;
+ } else if (command instanceof JoinGroupCommand) {
+
+ command.execute(configurationContext);
+ return new MemberListCommand();
}
return null;
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java Wed May 21 10:35:58 2008
@@ -23,6 +23,7 @@
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcCallback;
@@ -45,16 +46,27 @@
}
public Serializable replyRequest(Serializable msg, Member member) {
- if (msg instanceof GetStateCommand || msg instanceof GetConfigurationCommand) {
+ if (msg instanceof GetStateCommand ||
+ msg instanceof GetConfigurationCommand) {
try {
log.info("Received " + msg + " initialization request message from " +
TribesUtil.getHost(member));
- return controlCommandProcessor.process((ControlCommand) msg);
+ return controlCommandProcessor.process((ControlCommand) msg); // response is either GetConfigurationResponseCommand or GetStateResponseCommand
} catch (ClusteringFault e) {
String errMsg = "Cannot handle initialization request";
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
+ } else if (msg instanceof JoinGroupCommand) {
+ log.info("Received " + msg + " from " + TribesUtil.getHost(member));
+// JoinGroupCommand command = (JoinGroupCommand) msg;
+ try {
+ return controlCommandProcessor.process((ControlCommand) msg); // response is
+ } catch (ClusteringFault e) {
+ String errMsg = "Cannot handle JOIN request";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
+ }
}
return null;
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 21 10:35:58 2008
@@ -33,6 +33,7 @@
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.HandlerDescription;
import org.apache.axis2.description.Parameter;
@@ -78,6 +79,7 @@
*/
public class TribesClusterManager implements ClusterManager {
public static final int MSG_ORDER_OPTION = 512;
+
private static final Log log = LogFactory.getLog(TribesClusterManager.class);
private DefaultConfigurationManager configurationManager;
@@ -169,6 +171,24 @@
log.info("Local Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers(membershipManager);
+ // If a WKA scheme is used, JOIN the group and get the member list
+ if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
+ try {
+ Response[] responses = rpcChannel.send(membershipManager.getMembers(),
+ new JoinGroupCommand(),
+ RpcChannel.FIRST_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ if (responses.length > 0) {
+ ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+ }
+ } catch (ChannelException e) {
+ String msg = "Could not JOIN group";
+ log.error(e);
+ throw new ClusteringFault(msg, e);
+ }
+ }
+
// If configuration management is enabled, get the latest config from a neighbour
if (configurationManager != null) {
configurationManager.setSender(channelSender);
@@ -362,7 +382,9 @@
} catch (IOException e) {
String msg =
"Could not allocate a port in the range 4000-4100 for local host " +
- localMember.getHostname();
+ localMember.getHostname() +
+ ". Check whether the IP address specified or inferred for the local " +
+ "member is correct.";
log.error(msg, e);
throw new ClusteringFault(msg, e);
}
@@ -434,7 +456,7 @@
protected int getLocalPort(ServerSocket socket, String hostname,
int portstart, int retries) throws IOException {
InetSocketAddress addr = null;
- while (retries > 0) {
+ if (retries > 0) {
try {
addr = new InetSocketAddress(hostname, portstart);
socket.bind(addr);
@@ -459,10 +481,10 @@
} catch (InterruptedException ignored) {
ignored.printStackTrace();
}
- retries = getLocalPort(socket, hostname, portstart, retries);
+ getLocalPort(socket, hostname, portstart, retries);
}
}
- return retries;
+ return portstart;
}
/**
@@ -505,10 +527,6 @@
orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
channel.addInterceptor(orderInterceptor);
- // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
- AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
- channel.addInterceptor(atMostOnceInterceptor);
-
// Add a reliable failure detector
TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
tcpFailureDetector.setPrevious(dfi);
@@ -518,6 +536,10 @@
staticMembershipInterceptor = new StaticMembershipInterceptor();
channel.addInterceptor(staticMembershipInterceptor);
}
+
+ // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+ AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+ channel.addInterceptor(atMostOnceInterceptor);
}
/**
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=658777&r1=658776&r2=658777&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java Wed May 21 10:35:58 2008
@@ -33,4 +33,11 @@
public static final String BIND_ADDRESS = "bindAddress";
public static final String TCP_LISTEN_PORT = "tcpListenPort";
public static final String MAX_RETRIES = "maxRetries";
+
+ public final class MembershipMessages {
+ public static final int JOIN = 3;
+ public static final int LEAVE = 5;
+ public static final int NEW_MEMBER_JOINED = 7;
+ public static final int MEMBER_LIST = 11;
+ }
}