You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by kf...@apache.org on 2018/07/26 09:27:35 UTC

svn commit: r1836707 - in /tomcat/trunk/java/org/apache/catalina/tribes/membership: StaticMembershipProvider.java StaticMembershipService.java

Author: kfujino
Date: Thu Jul 26 09:27:35 2018
New Revision: 1836707

URL: http://svn.apache.org/viewvc?rev=1836707&view=rev
Log:
Add New Static Membership Service implementations.
- initial implementaion that remain a lot of TODOs. 

Added:
    tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java   (with props)
    tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java   (with props)

Added: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java?rev=1836707&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java (added)
+++ tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java Thu Jul 26 09:27:35 2018
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelException.FaultyMember;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipService;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.catalina.tribes.util.StringManager;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+public class StaticMembershipProvider extends MembershipProviderBase implements RpcCallback, ChannelListener, Heartbeat {
+
+    protected static final StringManager sm = StringManager.getManager(StaticMembershipProvider.class);
+    private static final Log log = LogFactory.getLog(StaticMembershipProvider.class);
+ 
+    protected Channel channel;
+    protected RpcChannel rpcChannel;
+    protected MembershipService service;
+    private String membershipName = null;
+    private byte[] membershipId = null;
+    protected ArrayList<StaticMember> staticMembers;
+    protected int sendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS;
+    protected long expirationTime = 5000;
+    protected int connectTimeout = 500;
+    protected long rpcTimeout = 3000;
+    protected int startLevel = 0;
+
+    @Override
+    public void init(Properties properties) throws Exception {
+        String expirationTimeStr = properties.getProperty("expirationTime");
+        this.expirationTime = Long.parseLong(expirationTimeStr);
+        String connectTimeoutStr = properties.getProperty("connectTimeout");
+        this.connectTimeout = Integer.parseInt(connectTimeoutStr);
+        String rpcTimeouStr = properties.getProperty("rpcTimeout");
+        this.rpcTimeout = Long.parseLong(rpcTimeouStr);
+        this.membershipName = properties.getProperty("membershipName");;
+        this.membershipId = membershipName.getBytes(StandardCharsets.ISO_8859_1);
+        membership = new Membership(service.getLocalMember(true));
+        this.rpcChannel = new RpcChannel(this.membershipId, channel, this);
+        this.channel.addChannelListener(this);
+    }
+
+    @Override
+    public void start(int level) throws Exception {
+        if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) {
+            //no-op
+        }
+        if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) {
+            //no-op
+        }
+        startLevel = (startLevel | level);
+        if (startLevel == (Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ)) {
+            startMembership(getAliveMembers(staticMembers.toArray(new Member[0])));
+        }
+    }
+
+    @Override
+    public boolean stop(int level) throws Exception {
+        if (Channel.MBR_RX_SEQ==(level & Channel.MBR_RX_SEQ)) {
+            // no-op
+        }
+        if (Channel.MBR_TX_SEQ==(level & Channel.MBR_TX_SEQ)) {
+            // no-op
+        }
+        startLevel = (startLevel & (~level));
+        if ( startLevel == 0 ) {
+            if (this.rpcChannel != null) {
+                this.rpcChannel.breakdown();
+            }
+            if (this.channel != null) {
+                try {
+                    stopMembership(this.getMembers());
+                } catch ( Exception ignore){}
+                this.channel.removeChannelListener(this);
+                this.channel = null;
+            }
+            this.rpcChannel = null;
+            this.membership.reset();
+        }
+        return (startLevel == 0);
+    }
+
+    protected void startMembership(Member[] members) throws ChannelException {
+        if (members.length == 0) return;
+        MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_START, service.getLocalMember(true));
+        Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout);
+        if (resp.length > 0) {
+            for (int i = 0; i < resp.length; i++) {
+                messageReceived(resp[i].getMessage(), resp[i].getSource());
+            }
+        } else {
+            log.warn("no response");
+        }
+    }
+
+    protected Member setupMember(Member mbr) {
+        // no-op
+        return mbr;
+    }
+
+    protected void memberAdded(Member member) {
+        Member mbr = setupMember(member);
+        if(membership.memberAlive(mbr)) {
+            // TODO invoke thread
+            membershipListener.memberAdded(mbr);
+        }
+    }
+
+    protected void memberDisappeared(Member member) {
+        membership.removeMember(member);
+        // TODO invoke thread
+        membershipListener.memberDisappeared(member);
+    }
+
+    protected void memberAlive(Member member) {
+        if (!membership.contains(member)) memberAdded(member);
+        membership.memberAlive(member);
+    }
+
+    protected void stopMembership(Member[] members) {
+        if (members.length == 0 ) return;
+        MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_STOP, service.getLocalMember(true));
+        try {
+            channel.send(members, msg, sendOptions);
+        } catch (ChannelException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void messageReceived(Serializable msg, Member sender) {
+        MemberMessage memMsg = (MemberMessage) msg;
+        Member member = memMsg.getMember();
+        if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+            memberAdded(member);
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_STOP) {
+            memberDisappeared(member);
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+            memberAlive(member);
+        }
+    }
+
+    @Override
+    public boolean accept(Serializable msg, Member sender) {
+        boolean result = false;
+        if (msg instanceof MemberMessage) {
+            result = Arrays.equals(this.membershipId, ((MemberMessage) msg).getMembershipId());
+        }
+        return result;
+    }
+
+    @Override
+    public Serializable replyRequest(Serializable msg, final Member sender) {
+        if (!(msg instanceof MemberMessage)) return null;
+        MemberMessage memMsg = (MemberMessage) msg;
+        if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+            messageReceived(memMsg, sender);
+            memMsg.setMember(service.getLocalMember(true));
+            return memMsg;
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+            messageReceived(memMsg, sender);
+            memMsg.setMember(service.getLocalMember(true));
+            return memMsg;
+        } else {
+            // other messages are ignored.
+            if (log.isInfoEnabled())
+                log.info("never happen");
+            return null;
+        }
+    }
+
+    @Override
+    public void leftOver(Serializable msg, Member sender) {
+        if (!(msg instanceof MemberMessage)) return;
+        MemberMessage memMsg = (MemberMessage) msg;
+        if (memMsg.getMsgtype() == MemberMessage.MSG_START) {
+            //TODO
+        } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) {
+            //TODO
+        } else {
+            // other messages are ignored.
+            if (log.isInfoEnabled())
+                log.info("never happen");
+        }
+    }
+
+    @Override
+    public void heartbeat() {
+        try {
+            ping();
+        } catch (ChannelException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    protected void ping() throws ChannelException {
+        // send ping
+        Member[] members = getAliveMembers(staticMembers.toArray(new Member[0]));
+        if (members.length == 0) return;
+        try {
+            MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_PING, service.getLocalMember(true));
+            Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout);
+            for (int i = 0; i < resp.length; i++) {
+                messageReceived(resp[i].getMessage(), resp[i].getSource());
+            }
+        } catch (ChannelException ce) {
+            // Handle known failed members
+            FaultyMember[] faultyMembers = ce.getFaultyMembers();
+            for (FaultyMember faultyMember : faultyMembers) {
+                memberDisappeared(faultyMember.getMember());
+            }
+            throw ce;
+        }
+        // expire
+        checkExpired();
+    }
+
+    protected void checkExpired() {
+        Member[] expired = membership.expire(expirationTime);
+        for (Member member : expired) {
+            membershipListener.memberDisappeared(member);
+        }
+    }
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
+    public void setMembershipService(MembershipService service) {
+        this.service = service;
+    }
+
+    public void setStaticMembers(ArrayList<StaticMember> staticMembers) {
+        this.staticMembers = staticMembers;
+    }
+
+    private Member[] getAliveMembers(Member[] members) {
+        List<Member> aliveMembers = new ArrayList<>();
+        for (Member member : members) {
+            try (Socket socket = new Socket()) {
+                InetAddress ia = InetAddress.getByAddress(member.getHost());
+                InetSocketAddress addr = new InetSocketAddress(ia, member.getPort());
+                socket.connect(addr, connectTimeout);
+                aliveMembers.add(member);
+            } catch (Exception x) {//no-op
+            }
+        }
+        return aliveMembers.toArray(new Member[0]);
+    }
+
+    // ------------------------------------------------------------------------------
+    // member message to send to and from other memberships
+    // ------------------------------------------------------------------------------
+    public static class MemberMessage implements Serializable {
+        private static final long serialVersionUID = 1L;
+        public static final int MSG_START = 1;
+        public static final int MSG_STOP = 2;
+        public static final int MSG_PING = 3;
+        private final int msgtype;
+        private final byte[] membershipId;
+        private Member member;
+
+        public MemberMessage(byte[] membershipId, int msgtype, Member member) {
+            this.membershipId = membershipId;
+            this.msgtype = msgtype;
+            this.member = member;
+        }
+
+        public int getMsgtype() {
+            return msgtype;
+        }
+
+        public byte[] getMembershipId() {
+            return membershipId;
+        }
+
+        public Member getMember() {
+            return member;
+        }
+
+        public void setMember(Member local) {
+            this.member = local;
+        }
+
+        @Override
+        public String toString() {
+            return super.toString();
+        }
+    }
+
+}
\ No newline at end of file

Propchange: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java?rev=1836707&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java (added)
+++ tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java Thu Jul 26 09:27:35 2018
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.catalina.tribes.membership;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipProvider;
+import org.apache.catalina.tribes.util.StringManager;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+public class StaticMembershipService extends MembershipServiceBase {
+    private static final Log log = LogFactory.getLog(StaticMembershipService.class);
+    protected static final StringManager sm = StringManager.getManager(Constants.Package);
+
+    protected final ArrayList<StaticMember> staticMembers = new ArrayList<>();
+    private StaticMember localMember;
+    private StaticMembershipProvider provider;
+
+    public StaticMembershipService() {
+        //default values
+        setDefaults(this.properties);
+    }
+
+    @Override
+    public void start(int level) throws Exception {
+        if (provider != null) {
+            provider.start(level);
+            return;
+        }
+        localMember.setServiceStartTime(System.currentTimeMillis());
+        localMember.setMemberAliveTime(100);
+        // build membership provider
+        if (provider == null) {
+            provider = buildMembershipProvider();
+        }
+        provider.start(level);
+        // TODO JMX register
+    }
+
+    protected StaticMembershipProvider buildMembershipProvider() throws Exception {
+        StaticMembershipProvider provider = new StaticMembershipProvider();
+        provider.setChannel(channel);
+        provider.setMembershipListener(this);
+        provider.setMembershipService(this);
+        provider.setStaticMembers(staticMembers);
+        properties.setProperty("membershipName", getMembershipName());
+        provider.init(properties);
+        return provider;
+    }
+
+    @Override
+    public void stop(int level) {
+        try {
+            if (provider != null && provider.stop(level)) {
+                // TODO JMX unregister
+                provider = null;
+                channel = null;;
+            }
+        } catch (Exception e) {
+            //TODO
+            log.error("stop failed", e);
+        }
+    }
+
+    @Override
+    public Member getLocalMember(boolean incAliveTime) {
+        if ( incAliveTime && localMember != null) {
+            localMember.setMemberAliveTime(System.currentTimeMillis()-localMember.getServiceStartTime());
+        }
+        return localMember;
+    }
+
+    @Override
+    public void setLocalMemberProperties(String listenHost, int listenPort, 
+            int securePort, int udpPort) {
+        try {
+            localMember.setHostname(listenHost);
+            localMember.setPort(listenPort);
+            localMember.setSecurePort(securePort);
+            localMember.setUdpPort(udpPort);
+            localMember.getData(true, true);
+        } catch (IOException x) {
+            throw new IllegalArgumentException(x);
+        }
+    }
+
+    @Override
+    public void setPayload(byte[] payload) {
+        // no-op
+    }
+
+    @Override
+    public void setDomain(byte[] domain) {
+        // no-op
+    }
+
+    @Override
+    public MembershipProvider getMembershipProvider() {
+        return provider;
+    }
+
+    public ArrayList<StaticMember> getStaticMembers() {
+        return staticMembers;
+    }
+
+    public void addStaticMember(StaticMember member) {
+        staticMembers.add(member);
+    }
+
+    public void removeStaticMember(StaticMember member) {
+        staticMembers.remove(member);
+    }
+
+    public void setLocalMember(StaticMember member) {
+        this.localMember = member;
+        localMember.setLocal(true);
+    }
+
+     public long getExpirationTime() {
+         String expirationTime = properties.getProperty("expirationTime");
+         return Long.parseLong(expirationTime);
+     }
+
+    public void setExpirationTime(long expirationTime) {
+        properties.setProperty("expirationTime", String.valueOf(expirationTime));
+    }
+
+     public int getConnectTimeout() {
+         String connectTimeout = properties.getProperty("connectTimeout");
+         return Integer.parseInt(connectTimeout);
+     }
+
+    public void setConnectTimeout(int connectTimeout) {
+        properties.setProperty("connectTimeout", String.valueOf(connectTimeout));
+    }
+
+    public long getRpcTimeout() {
+        String rpcTimeout = properties.getProperty("rpcTimeout");
+        return Long.parseLong(rpcTimeout);
+    }
+
+    public void setRpcTimeout(long rpcTimeout) {
+        properties.setProperty("rpcTimeout", String.valueOf(rpcTimeout));
+    }
+
+    @Override
+    public void setProperties(Properties properties) {
+        setDefaults(properties);
+        this.properties = properties;
+    }
+
+    protected void setDefaults(Properties properties) {
+        // default values
+        if (properties.getProperty("expirationTime") == null)
+            properties.setProperty("expirationTime","5000");
+        if (properties.getProperty("connectTimeout") == null)
+            properties.setProperty("connectTimeout","500");
+        if (properties.getProperty("rpcTimeout") == null)
+            properties.setProperty("rpcTimeout","3000");
+    }
+
+    private String getMembershipName() {
+        return channel.getName()+"-"+"StaticMembership";
+    }
+}
\ No newline at end of file

Propchange: tomcat/trunk/java/org/apache/catalina/tribes/membership/StaticMembershipService.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r1836707 - in /tomcat/trunk/java/org/apache/catalina/tribes/membership: StaticMembershipProvider.java StaticMembershipService.java

Posted by Keiichi Fujino <kf...@apache.org>.
2018-07-26 18:36 GMT+09:00 Mark Thomas <ma...@apache.org>:

> On 26/07/2018 11:27, kfujino@apache.org wrote:
>
>> Author: kfujino
>> Date: Thu Jul 26 09:27:35 2018
>> New Revision: 1836707
>>
>> URL: http://svn.apache.org/viewvc?rev=1836707&view=rev
>> Log:
>> Add New Static Membership Service implementations.
>> - initial implementaion that remain a lot of TODOs.
>>
>
> I appreciate that this is a work in progress. Can you explain the
> differences / benefits / disadvantages of this vs. the
> StaticMembershipInterceptor?
>
> I'd like to understand when I should use one or the other.
>
>
The main motivations for implementing the new static membership are,

I would like to implement this as a channel membership service instead of
implementing it with the channel interceptors.
Setting multiple interceptors is complicated in order to configure static
membership.
It should work with only StaticMembershipService setting.
In other words, it should not depend on interceptors such as
TcpFailureDetector and TcpPingInterceptor.

Also, I wanted to unify the implementation of Tomcat clustering's
membership service.
Currently, McastService and StaticMembershipService are implemented in a
unified way.
If someone implements another membership service (like cloud membership?,
for example)
it can also use MembershipServiceBase and MembershipProviderBase in the
same way.

Thanks.


> Thanks,
>
> Mark
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org
>
> --
> Keiichi.Fujino
> <de...@tomcat.apache.org>
> <de...@tomcat.apache.org>
>

Re: svn commit: r1836707 - in /tomcat/trunk/java/org/apache/catalina/tribes/membership: StaticMembershipProvider.java StaticMembershipService.java

Posted by Mark Thomas <ma...@apache.org>.
On 26/07/2018 11:27, kfujino@apache.org wrote:
> Author: kfujino
> Date: Thu Jul 26 09:27:35 2018
> New Revision: 1836707
> 
> URL: http://svn.apache.org/viewvc?rev=1836707&view=rev
> Log:
> Add New Static Membership Service implementations.
> - initial implementaion that remain a lot of TODOs.

I appreciate that this is a work in progress. Can you explain the 
differences / benefits / disadvantages of this vs. the 
StaticMembershipInterceptor?

I'd like to understand when I should use one or the other.

Thanks,

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org