You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2015/12/15 07:29:03 UTC

svn commit: r1720074 - in /tomcat/trunk: java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java webapps/docs/changelog.xml

Author: kfujino
Date: Tue Dec 15 06:29:03 2015
New Revision: 1720074

URL: http://svn.apache.org/viewvc?rev=1720074&view=rev
Log:
Add support for the startup notification of local members in the static cluster.

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
    tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties?rev=1720074&r1=1720073&r2=1720074&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties Tue Dec 15 06:29:03 2015
@@ -34,6 +34,7 @@ nonBlockingCoordinator.heartbeat.failed=
 orderInterceptor.messageAdded.sameCounter=Message added has the same counter, synchronization bug. Disable the order interceptor
 staticMembershipInterceptor.no.failureDetector=There is no TcpFailureDetector. Automatic detection of static members does not work properly. By defining the StaticMembershipInterceptor under the TcpFailureDetector, automatic detection of the static members will work.
 staticMembershipInterceptor.no.pingInterceptor=There is no TcpPingInterceptor. The health check of static members does not work properly. By defining the TcpPingInterceptor, the health check of static members will work.
+staticMembershipInterceptor.sendLocalMember.failed=Local member notification failed.
 tcpFailureDetector.memberDisappeared.verify=Received memberDisappeared[{0}] message. Will verify.
 tcpFailureDetector.already.disappeared=Verification complete. Member already disappeared[{0}]
 tcpFailureDetector.member.disappeared=Verification complete. Member disappeared[{0}]

Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java?rev=1720074&r1=1720073&r2=1720074&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/StaticMembershipInterceptor.java Tue Dec 15 06:29:03 2015
@@ -17,13 +17,17 @@
 package org.apache.catalina.tribes.group.interceptors;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.AbsoluteOrder;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.catalina.tribes.io.XByteBuffer;
 import org.apache.catalina.tribes.util.StringManager;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -34,6 +38,10 @@ public class StaticMembershipInterceptor
     protected static final StringManager sm =
             StringManager.getManager(StaticMembershipInterceptor.class);
 
+    protected static final byte[] MEMBER_START = new byte[] {
+        76, 111, 99, 97, 108, 32, 83, 116, 97, 116, 105, 99, 77, 101, 109, 98, 101, 114, 32, 78,
+        111, 116, 105, 102, 105, 99, 97, 116, 105, 111, 110, 32, 68, 97, 116, 97};
+
     protected final ArrayList<Member> members = new ArrayList<>();
     protected Member localMember = null;
 
@@ -57,6 +65,21 @@ public class StaticMembershipInterceptor
         this.localMember = member;
     }
 
+    @Override
+    public void messageReceived(ChannelMessage msg) {
+        if (msg.getMessage().getLength() == MEMBER_START.length &&
+                Arrays.equals(MEMBER_START, msg.getMessage().getBytes())) {
+            // receive member start
+            Member member = getMember(msg.getAddress());
+            if (member != null) {
+                super.memberAdded(member);
+            }
+            
+        } else {
+            super.messageReceived(msg);
+        }
+    }
+
     /**
      * has members
      */
@@ -115,17 +138,19 @@ public class StaticMembershipInterceptor
     public void start(int svc) throws ChannelException {
         if ( (Channel.SND_RX_SEQ&svc)==Channel.SND_RX_SEQ ) super.start(Channel.SND_RX_SEQ);
         if ( (Channel.SND_TX_SEQ&svc)==Channel.SND_TX_SEQ ) super.start(Channel.SND_TX_SEQ);
-        final Member[] mbrs = members.toArray(new Member[members.size()]);
         final ChannelInterceptorBase base = this;
-        Thread t = new Thread() {
-            @Override
-            public void run() {
-                for (int i=0; i<mbrs.length; i++ ) {
-                    base.memberAdded(mbrs[i]);
+        for (Member member : members) {
+            Thread t = new Thread() {
+                @Override
+                public void run() {
+                    base.memberAdded(member);
+                    if (base.getPrevious().getMember(member) != null) {
+                        sendLocalMember(new Member[]{member});
+                    }
                 }
-            }
-        };
-        t.start();
+            };
+            t.start();
+        }
         super.start(svc & (~Channel.SND_RX_SEQ) & (~Channel.SND_TX_SEQ));
 
         // check required interceptors
@@ -145,4 +170,17 @@ public class StaticMembershipInterceptor
         }
     }
 
+    protected void sendLocalMember(Member[] members) {
+        if ( members == null || members.length == 0 ) return;
+        ChannelData data = new ChannelData(true);
+        data.setAddress(getLocalMember(false));
+        data.setTimestamp(System.currentTimeMillis());
+        data.setOptions(getOptionFlag());
+        data.setMessage(new XByteBuffer(MEMBER_START, false));
+        try {
+            super.sendMessage(members, data, null);
+        }catch (ChannelException cx) {
+            log.warn(sm.getString("staticMembershipInterceptor.sendLocalMember.failed"),cx);
+        }
+    }
 }
\ No newline at end of file

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1720074&r1=1720073&r2=1720074&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Tue Dec 15 06:29:03 2015
@@ -233,6 +233,10 @@
         the membership service to the map members list in order to ensure that
         the map member is a static member. (kfujino)
       </fix>
+      <fix>
+        Add support for the startup notification of local members in the static
+        cluster. (kfujino)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="jdbc-pool">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org