You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/06/14 19:47:01 UTC
svn commit: r414323 - in /tomcat/container/tc5.5.x/modules/groupcom:
src/share/org/apache/catalina/tribes/group/interceptors/
src/share/org/apache/catalina/tribes/transport/
src/share/org/apache/catalina/tribes/transport/bio/
src/share/org/apache/catal...
Author: fhanik
Date: Wed Jun 14 10:47:00 2006
New Revision: 414323
URL: http://svn.apache.org/viewvc?rev=414323&view=rev
Log:
Further enhancements, looking pretty good
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java Wed Jun 14 10:47:00 2006
@@ -192,7 +192,14 @@
return; //the only member, no need for an election
}
if ( suggestedviewId != null ) {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
+
+ if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 && Arrays.diff(suggestedView,view,local).length == 0) {
+ suggestedviewId = null;
+ suggestedView = null;
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
+ } else {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
+ }
return; //election already running, I'm not allowed to have two of them
}
if ( view != null && Arrays.diff(view,membership,local).length == 0 && Arrays.diff(membership,view,local).length == 0) {
@@ -218,10 +225,11 @@
}
if ( suggestedviewId == null && (!coordMsgReceived.get())) {
//no message arrived, send the coord msg
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
- startElection(true);
+// fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
+// startElection(true);
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
} else {
- fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned"));
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
}
}//end if
@@ -307,7 +315,6 @@
}
protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
-// synchronized (electionMutex) {
coordMsgReceived.set(true);
msg.timestamp = System.currentTimeMillis();
Membership merged = mergeOnArrive(msg,sender);
@@ -440,7 +447,6 @@
// OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE
//============================================================================================================
public void start(int svc) throws ChannelException {
-// synchronized (electionMutex) {
if (membership == null) setupMembership();
if (started)return;
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
@@ -449,7 +455,6 @@
if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
startElection(false);
-// }
}
public void stop(int svc) throws ChannelException {
@@ -527,9 +532,8 @@
super.memberDisappeared(member);
try {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
- if ( started && (isCoordinator() || member.equals(getCoordinator())) )
- startElection(false);
- //to do, if a member disappears, only the coordinator can start
+ if ( started && (isCoordinator() || isHighest()) )
+ startElection(true); //to do, if a member disappears, only the coordinator can start
}catch ( ChannelException x ) {
log.error("Unable to start election when member was removed.",x);
}
@@ -537,13 +541,32 @@
}
}
+ public boolean isHighest() {
+ Member local = getLocalMember(false);
+ if ( membership.getMembers().length == 0 ) return true;
+ else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
+ }
+
public boolean isCoordinator() {
Member coord = getCoordinator();
return coord != null && getLocalMember(false).equals(coord);
}
public void heartbeat() {
- super.heartbeat();
+ try {
+ MemberImpl local = (MemberImpl)getLocalMember(false);
+ if ( view != null && (Arrays.diff(view,membership,local).length != 0 || Arrays.diff(membership,view,local).length != 0) ) {
+ if ( isHighest() ) {
+ fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
+ "Heartbeat found inconsistency, restart election"));
+ startElection(true);
+ }
+ }
+ } catch ( Exception x ){
+ log.error("Unable to perform heartbeat.",x);
+ } finally {
+ super.heartbeat();
+ }
}
/**
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Wed Jun 14 10:47:00 2006
@@ -111,39 +111,45 @@
}//messageReceived
- public synchronized void memberAdded(Member member) {
+ public void memberAdded(Member member) {
if ( membership == null ) setupMembership();
- if ( removeSuspects.containsKey(member) ) {
- //previously marked suspect, system below picked up the member again
- removeSuspects.remove(member);
- } else {
- //if we add it here, then add it upwards too
- if ( membership.getMember((MemberImpl)member) == null) {
+ boolean notify = false;
+ synchronized (membership) {
+ if (removeSuspects.containsKey(member)) {
+ //previously marked suspect, system below picked up the member again
+ removeSuspects.remove(member);
+ } else if (membership.getMember( (MemberImpl) member) == null){
+ //if we add it here, then add it upwards too
//check to see if it is alive
if (memberAlive(member)) {
- membership.memberAlive((MemberImpl)member);
- super.memberAdded(member);
+ membership.memberAlive( (MemberImpl) member);
+ notify = true;
} else {
addSuspects.put(member, new Long(System.currentTimeMillis()));
}
- }
+ }
}
+ if ( notify ) super.memberAdded(member);
}
- public synchronized void memberDisappeared(Member member) {
+ public void memberDisappeared(Member member) {
if ( membership == null ) setupMembership();
+ boolean notify = false;
boolean shutdown = Arrays.equals(member.getPayload(),Member.SHUTDOWN_PAYLOAD);
if ( !shutdown ) log.info("Received memberDisappeared["+member+"] message. Will verify.");
- //check to see if the member really is gone
- //if the payload is not a shutdown message
- if ( shutdown || !memberAlive(member) ) {
- //not correct, we need to maintain the map
- membership.removeMember((MemberImpl)member);
- super.memberDisappeared(member);
- } else {
- //add the member as suspect
- removeSuspects.put(member, new Long(System.currentTimeMillis()));
+ synchronized (membership) {
+ //check to see if the member really is gone
+ //if the payload is not a shutdown message
+ if (shutdown || !memberAlive(member)) {
+ //not correct, we need to maintain the map
+ membership.removeMember( (MemberImpl) member);
+ notify = true;
+ } else {
+ //add the member as suspect
+ removeSuspects.put(member, new Long(System.currentTimeMillis()));
+ }
}
+ if ( notify ) super.memberDisappeared(member);
}
public boolean hasMembers() {
@@ -165,47 +171,48 @@
return super.getLocalMember(incAlive);
}
- public synchronized void heartbeat() {
+ public void heartbeat() {
try {
if (membership == null) setupMembership();
- //update all alive times
- Member[] members = super.getMembers();
- for (int i = 0; members != null && i < members.length; i++) {
- if (membership.memberAlive( (MemberImpl) members[i])) {
- //we don't have this one in our membership, check to see if he/she is alive
- if (memberAlive(members[i])) {
- log.warn("Member added, even though we werent notified:" + members[i]);
- super.memberAdded(members[i]);
- } else {
- membership.removeMember( (MemberImpl) members[i]);
+ synchronized (membership) {
+ //update all alive times
+ Member[] members = super.getMembers();
+ for (int i = 0; members != null && i < members.length; i++) {
+ if (membership.memberAlive( (MemberImpl) members[i])) {
+ //we don't have this one in our membership, check to see if he/she is alive
+ if (memberAlive(members[i])) {
+ log.warn("Member added, even though we werent notified:" + members[i]);
+ super.memberAdded(members[i]);
+ } else {
+ membership.removeMember( (MemberImpl) members[i]);
+ } //end if
} //end if
- } //end if
- } //for
+ } //for
- //check suspect members if they are still alive,
- //if not, simply issue the memberDisappeared message
- MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
- for (int i = 0; i < keys.length; i++) {
- MemberImpl m = (MemberImpl) keys[i];
- if (membership.getMember(m) != null && (!memberAlive(m))) {
- membership.removeMember(m);
- super.memberDisappeared(m);
- removeSuspects.remove(m);
- } //end if
- }
+ //check suspect members if they are still alive,
+ //if not, simply issue the memberDisappeared message
+ MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
+ for (int i = 0; i < keys.length; i++) {
+ MemberImpl m = (MemberImpl) keys[i];
+ if (membership.getMember(m) != null && (!memberAlive(m))) {
+ membership.removeMember(m);
+ super.memberDisappeared(m);
+ removeSuspects.remove(m);
+ } //end if
+ }
- //check add suspects members if they are alive now,
- //if they are, simply issue the memberAdded message
- keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
- for (int i = 0; i < keys.length; i++) {
- MemberImpl m = (MemberImpl) keys[i];
- if ( membership.getMember(m) == null && (memberAlive(m))) {
- membership.memberAlive(m);
- super.memberAdded(m);
- addSuspects.remove(m);
- } //end if
+ //check add suspects members if they are alive now,
+ //if they are, simply issue the memberAdded message
+ keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
+ for (int i = 0; i < keys.length; i++) {
+ MemberImpl m = (MemberImpl) keys[i];
+ if ( membership.getMember(m) == null && (memberAlive(m))) {
+ membership.memberAlive(m);
+ super.memberAdded(m);
+ addSuspects.remove(m);
+ } //end if
+ }
}
-
}catch ( Exception x ) {
log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
} finally {
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/ReceiverBase.java Wed Jun 14 10:47:00 2006
@@ -177,10 +177,11 @@
}catch ( IOException x) {
retries--;
if ( retries <= 0 ) {
- log.info("Unable to bind server socket to "+addr+" throwing error.");
+ log.info("Unable to bind server socket to:"+addr+" throwing error.");
throw x;
}
portstart++;
+ try {Thread.sleep(25);}catch( InterruptedException ti){Thread.currentThread().interrupted();}
retries = bind(socket,portstart,retries);
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/bio/BioReceiver.java Wed Jun 14 10:47:00 2006
@@ -55,9 +55,10 @@
public void start() throws IOException {
try {
setPool(new ThreadPool(new Object(),getMaxThreads(),getMinThreads(),this));
- } catch (Exception e) {
- log.error("ThreadPool can initilzed. Listener not started", e);
- return;
+ } catch (Exception x) {
+ log.fatal("ThreadPool can initilzed. Listener not started", x);
+ if ( x instanceof IOException ) throw (IOException)x;
+ else throw new IOException(x.getMessage());
}
try {
getBind();
@@ -67,6 +68,8 @@
t.start();
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
+ if ( x instanceof IOException ) throw (IOException)x;
+ else throw new IOException(x.getMessage());
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/transport/nio/NioReceiver.java Wed Jun 14 10:47:00 2006
@@ -83,12 +83,13 @@
* @throws Exception
* @see org.apache.catalina.tribes.ClusterReceiver#start()
*/
- public void start() {
+ public void start() throws IOException {
try {
setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this));
- } catch (Exception e) {
- log.error("ThreadPool can initilzed. Listener not started", e);
- return;
+ } catch (Exception x) {
+ log.fatal("ThreadPool can initilzed. Listener not started", x);
+ if ( x instanceof IOException ) throw (IOException)x;
+ else throw new IOException(x.getMessage());
}
try {
getBind();
@@ -98,6 +99,8 @@
t.start();
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
+ if ( x instanceof IOException ) throw (IOException)x;
+ else throw new IOException(x.getMessage());
}
}
Modified: tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java
URL: http://svn.apache.org/viewvc/tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java?rev=414323&r1=414322&r2=414323&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/test/java/org/apache/catalina/tribes/demos/CoordinationDemo.java Wed Jun 14 10:47:00 2006
@@ -21,7 +21,9 @@
static int CHANNEL_COUNT = 5;
static int SCREEN_WIDTH = 120;
static long SLEEP_TIME = 10;
+ static int CLEAR_SCREEN = 30;
static boolean MULTI_THREAD = false;
+ static boolean[] VIEW_EVENTS = new boolean[255];
StringBuffer statusLine = new StringBuffer();
Status[] status = null;
BufferedReader reader = null;
@@ -36,7 +38,7 @@
public void clearScreen() {
StringBuffer buf = new StringBuffer(700);
- for (int i=0; i<30; i++ ) buf.append("\n");
+ for (int i=0; i<CLEAR_SCREEN; i++ ) buf.append("\n");
System.out.println(buf);
}
@@ -165,20 +167,39 @@
-
-
-
+ public static void setEvents(String events) {
+ java.util.Arrays.fill(VIEW_EVENTS,false);
+ StringTokenizer t = new StringTokenizer(events,",");
+ while (t.hasMoreTokens() ) {
+ int idx = Integer.parseInt(t.nextToken());
+ VIEW_EVENTS[idx] = true;
+ }
+ }
+
public static void main(String[] args) throws Exception {
System.out.println("Usage:");
- System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo [channel-count multi-thread]");
+ System.out.println("\tjava org.apache.catalina.tribes.demos.CoordinationDemo -c channel-count(int) -t multi-thread(true|false) -s sleep-time(ms) -sc clear-screen(int) -p view_events_csv(1,2,5,7)");
System.out.println("Example:");
System.out.println("\tjava o.a.c.t.d.CoordinationDemo -> starts demo single threaded start/stop with 5 channels");
- System.out.println("\tjava o.a.c.t.d.CoordinationDemo 10 -> starts demo single threaded start/stop with 10 channels");
- System.out.println("\tjava o.a.c.t.d.CoordinationDemo 7 true-> starts demo multi threaded start/stop with 7 channels");
+ System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 10 -> starts demo single threaded start/stop with 10 channels");
+ System.out.println("\tjava o.a.c.t.d.CoordinationDemo -c 7 -t true -s 1000 -sc 50-> starts demo multi threaded start/stop with 7 channels and 1 second sleep time between events and 50 lines to clear screen");
+ System.out.println("\tjava o.a.c.t.d.CoordinationDemo -t true -p 12 -> starts demo multi threaded start/stop with 5 channels and only prints the EVT_CONF_RX event");
System.out.println();
-
- if ( args.length >= 1 ) CHANNEL_COUNT = Integer.parseInt(args[0]);
- if ( args.length >= 2 ) MULTI_THREAD = true;
+ java.util.Arrays.fill(VIEW_EVENTS,true);
+
+ for (int i=0; i<args.length; i++ ) {
+ if ( "-c".equals(args[i]) )
+ CHANNEL_COUNT = Integer.parseInt(args[++i]);
+ else if ( "-t".equals(args[i]) )
+ MULTI_THREAD = Boolean.parseBoolean(args[++i]);
+ else if ( "-s".equals(args[i]) )
+ SLEEP_TIME = Long.parseLong(args[++i]);
+ else if ( "-sc".equals(args[i]) )
+ CLEAR_SCREEN = Integer.parseInt(args[++i]);
+ else if ( "-p".equals(args[i]) )
+ setEvents(args[++i]);
+ else if ( "-h".equals(args[i]) ) System.exit(0);
+ }
CoordinationDemo demo = new CoordinationDemo();
demo.waitForInput();
}
@@ -265,6 +286,9 @@
status = "Start failed:"+x.getMessage();
error = x;
startstatus = "failed";
+ try { channel.stop(GroupChannel.DEFAULT);}catch(Exception ignore){}
+ channel = null;
+ interceptor = null;
}
}
@@ -298,11 +322,10 @@
interceptor = new NonBlockingCoordinator() {
public void fireInterceptorEvent(InterceptorEvent event) {
status = event.getEventTypeDesc();
-// if ( event instanceof NonBlockingCoordinator.CoordinationEvent &&
-// ((NonBlockingCoordinator.CoordinationEvent)event).getEventType() == NonBlockingCoordinator.CoordinationEvent.EVT_CONF_RX)
- parent.printScreen();
+ int type = event.getEventType();
+ boolean display = VIEW_EVENTS[type];
+ if ( display ) parent.printScreen();
try { Thread.sleep(SLEEP_TIME); }catch ( Exception x){}
-
}
};
channel.addInterceptor(interceptor);
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org