You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2007/04/14 01:26:08 UTC
svn commit: r528702 - in
/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors:
TcpFailureDetector.java TcpPingInterceptor.java
Author: fhanik
Date: Fri Apr 13 16:26:07 2007
New Revision: 528702
URL: http://svn.apache.org/viewvc?view=rev&rev=528702
Log:
Added a TCP ping for membership, to be used with static memberships and with the TCP failure detector
Added:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java
Modified:
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java?view=diff&rev=528702&r1=528701&r2=528702
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpFailureDetector.java Fri Apr 13 16:26:07 2007
@@ -180,53 +180,80 @@
}
public void heartbeat() {
+ checkMembers(false);
+ }
+ public void checkMembers(boolean checkAll) {
+
try {
if (membership == null) setupMembership();
synchronized (membership) {
- //update all alive times
- Member[] members = super.getMembers();
- for (int i = 0; members != null && i < members.length; i++) {
- if (membership.memberAlive( (MemberImpl) members[i])) {
- //we don't have this one in our membership, check to see if he/she is alive
- if (memberAlive(members[i])) {
- log.warn("Member added, even though we werent notified:" + members[i]);
- super.memberAdded(members[i]);
- } else {
- membership.removeMember( (MemberImpl) members[i]);
- } //end if
- } //end if
- } //for
-
- //check suspect members if they are still alive,
- //if not, simply issue the memberDisappeared message
- MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
- for (int i = 0; i < keys.length; i++) {
- MemberImpl m = (MemberImpl) keys[i];
- if (membership.getMember(m) != null && (!memberAlive(m))) {
- membership.removeMember(m);
- super.memberDisappeared(m);
- removeSuspects.remove(m);
- log.info("Suspect member, confirmed dead.["+m+"]");
- } //end if
- }
-
- //check add suspects members if they are alive now,
- //if they are, simply issue the memberAdded message
- keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
- for (int i = 0; i < keys.length; i++) {
- MemberImpl m = (MemberImpl) keys[i];
- if ( membership.getMember(m) == null && (memberAlive(m))) {
- membership.memberAlive(m);
- super.memberAdded(m);
- addSuspects.remove(m);
- log.info("Suspect member, confirmed alive.["+m+"]");
- } //end if
- }
+ if ( !checkAll ) performBasicCheck();
+ else performForcedCheck();
}
}catch ( Exception x ) {
log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
} finally {
super.heartbeat();
+ }
+ }
+
+ protected void performForcedCheck() {
+ //update all alive times
+ Member[] members = super.getMembers();
+ for (int i = 0; members != null && i < members.length; i++) {
+ if (memberAlive(members[i])) {
+ if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]);
+ addSuspects.remove(members[i]);
+ } else {
+ if (membership.getMember(members[i])!=null) {
+ membership.removeMember((MemberImpl)members[i]);
+ removeSuspects.remove(members[i]);
+ super.memberDisappeared((MemberImpl)members[i]);
+ }
+ } //end if
+ } //for
+
+ }
+
+ protected void performBasicCheck() {
+ //update all alive times
+ Member[] members = super.getMembers();
+ for (int i = 0; members != null && i < members.length; i++) {
+ if (membership.memberAlive( (MemberImpl) members[i])) {
+ //we don't have this one in our membership, check to see if he/she is alive
+ if (memberAlive(members[i])) {
+ log.warn("Member added, even though we werent notified:" + members[i]);
+ super.memberAdded(members[i]);
+ } else {
+ membership.removeMember( (MemberImpl) members[i]);
+ } //end if
+ } //end if
+ } //for
+
+ //check suspect members if they are still alive,
+ //if not, simply issue the memberDisappeared message
+ MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
+ for (int i = 0; i < keys.length; i++) {
+ MemberImpl m = (MemberImpl) keys[i];
+ if (membership.getMember(m) != null && (!memberAlive(m))) {
+ membership.removeMember(m);
+ super.memberDisappeared(m);
+ removeSuspects.remove(m);
+ log.info("Suspect member, confirmed dead.["+m+"]");
+ } //end if
+ }
+
+ //check add suspects members if they are alive now,
+ //if they are, simply issue the memberAdded message
+ keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
+ for (int i = 0; i < keys.length; i++) {
+ MemberImpl m = (MemberImpl) keys[i];
+ if ( membership.getMember(m) == null && (memberAlive(m))) {
+ membership.memberAlive(m);
+ super.memberAdded(m);
+ addSuspects.remove(m);
+ log.info("Suspect member, confirmed alive.["+m+"]");
+ } //end if
}
}
Added: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java?view=auto&rev=528702
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java (added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/TcpPingInterceptor.java Fri Apr 13 16:26:07 2007
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.group.interceptors;
+
+import java.lang.ref.WeakReference;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.ChannelData;
+
+/**
+ *
+ * Sends a ping to all members.
+ * Configure this interceptor with the TcpFailureDetector below it,
+ * and the TcpFailureDetector will act as the membership guide.
+ * @author Filip Hanik
+ * @version 1.0
+ */
+
+public class TcpPingInterceptor extends ChannelInterceptorBase {
+
+ protected static org.apache.juli.logging.Log log =
+ org.apache.juli.logging.LogFactory.getLog(TcpPingInterceptor.class);
+
+ protected static byte[] TCP_PING_DATA = new byte[] {
+ 79, -89, 115, 72, 121, -33, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
+ 125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
+ 55, 21, -66, -121, 69, 33, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
+ 85, -10, -108, -73, 58, -33, 33, 120, -111, 4, 125, -41, 114, -124, -64, -43};
+
+ protected long interval = 1000; //1 second
+
+ protected boolean useThread = false;
+ protected boolean staticOnly = false;
+ protected boolean running = true;
+ protected PingThread thread = null;
+ protected static AtomicInteger cnt = new AtomicInteger(0);
+
+ WeakReference<TcpFailureDetector> failureDetector = null;
+ WeakReference<StaticMembershipInterceptor> staticMembers = null;
+
+ public synchronized void start(int svc) throws ChannelException {
+ super.start(svc);
+ running = true;
+ if ( thread == null ) {
+ thread = new PingThread();
+ thread.setDaemon(true);
+ thread.setName("TcpPingInterceptor.PingThread-"+cnt.addAndGet(1));
+ thread.start();
+ }
+
+ //acquire the interceptors to invoke on send ping events
+ ChannelInterceptor next = getNext();
+ while ( next != null ) {
+ if ( next instanceof TcpFailureDetector )
+ failureDetector = new WeakReference<TcpFailureDetector>((TcpFailureDetector)next);
+ if ( next instanceof StaticMembershipInterceptor )
+ staticMembers = new WeakReference<StaticMembershipInterceptor>((StaticMembershipInterceptor)next);
+ next = next.getNext();
+ }
+
+ }
+
+ public void stop(int svc) throws ChannelException {
+ running = false;
+ if ( thread != null ) thread.interrupt();
+ thread = null;
+ super.stop(svc);
+ }
+
+ public void heartbeat() {
+ super.heartbeat();
+ if (!getUseThread()) sendPing();
+ }
+
+ public long getInterval() {
+ return interval;
+ }
+
+ public void setInterval(long interval) {
+ this.interval = interval;
+ }
+
+ public void setUseThread(boolean useThread) {
+ this.useThread = useThread;
+ }
+
+ public void setStaticOnly(boolean staticOnly) {
+ this.staticOnly = staticOnly;
+ }
+
+ public boolean getUseThread() {
+ return useThread;
+ }
+
+ public boolean getStaticOnly() {
+ return staticOnly;
+ }
+
+ protected void sendPing() {
+ if (failureDetector.get()!=null) {
+ //we have a reference to the failure detector
+ //piggy back on that dude
+ failureDetector.get().checkMembers(true);
+ }else {
+ if (staticOnly && staticMembers.get()!=null) {
+ sendPingMessage(staticMembers.get().getMembers());
+ } else {
+ sendPingMessage(getMembers());
+ }
+ }
+ }
+
+ protected void sendPingMessage(Member[] members) {
+ if ( members == null || members.length == 0 ) return;
+ ChannelData data = new ChannelData(true);//generates a unique Id
+ data.setAddress(getLocalMember(false));
+ data.setTimestamp(System.currentTimeMillis());
+ data.setOptions(getOptionFlag());
+ try {
+ super.sendMessage(members, data, null);
+ }catch (ChannelException x) {
+ log.warn("Unable to send TCP ping.",x);
+ }
+ }
+
+ public void messageReceived(ChannelMessage msg) {
+ //catch incoming
+ boolean process = true;
+ if ( okToProcess(msg.getOptions()) ) {
+ //check to see if it is a ping message, if so, process = false
+ process = ( (msg.getMessage().getLength() != TCP_PING_DATA.length) ||
+ (!Arrays.equals(TCP_PING_DATA,msg.getMessage().getBytes()) ) );
+ }//end if
+
+ //ignore the message, it doesnt have the flag set
+ if ( process ) super.messageReceived(msg);
+ else if ( log.isDebugEnabled() ) log.debug("Received a TCP ping packet:"+msg);
+ }//messageReceived
+
+ protected class PingThread extends Thread {
+ public void run() {
+ while (running) {
+ try {
+ sleep(interval);
+ sendPing();
+ }catch ( InterruptedException ix ) {
+ interrupted();
+ }catch ( Exception x ) {
+ log.warn("Unable to send ping from TCP ping thread.",x);
+ }
+ }
+ }
+ }
+
+
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org