You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by sebb <se...@gmail.com> on 2009/04/12 00:00:35 UTC

Re: svn commit: r763726 - in /tomcat/trunk/java/org/apache/catalina/ha/backend: HeartbeatListener.java MultiCastSender.java Proxy.java Sender.java TcpSender.java

On 09/04/2009, jfclere@apache.org <jf...@apache.org> wrote:
> Author: jfclere
>  Date: Thu Apr  9 16:32:04 2009
>  New Revision: 763726
>
>  URL: http://svn.apache.org/viewvc?rev=763726&view=rev
>  Log:
>  Add the Tcp code... Still need the code in httpd-trunk to test it.
>
>  Added:
>     tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java
>     tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java

Should add SVN property svn:eol-style=native for such files...

>  Modified:
>     tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java
>     tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java
>     tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java
>
>  Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java
>  URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java?rev=763726&r1=763725&r2=763726&view=diff
>  ==============================================================================
>  --- tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java (original)
>  +++ tomcat/trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java Thu Apr  9 16:32:04 2009
>  @@ -29,11 +29,6 @@
>
>   import org.apache.catalina.connector.Connector;
>
>  -import java.net.MulticastSocket;
>  -import java.net.InetAddress;
>  -import java.net.DatagramPacket;
>  -import java.io.UnsupportedEncodingException;
>  -
>   import org.apache.tomcat.util.modeler.Registry;
>
>   /*
>  @@ -66,6 +61,20 @@
>      public void setTtl(int ttl) { this.ttl = ttl; }
>      public int getTtl() { return ttl; }
>
>  +    /**
>  +     * Proxy list, format "address:port,address:port".
>  +     */
>  +    protected String proxyList = null;
>  +    public String getProxyList() { return proxyList; }
>  +    public void setProxyList(String proxyList) { this.proxyList = proxyList; }
>  +
>  +    /**
>  +     * URL prefix.
>  +     */
>  +    protected String proxyURL = "/HeartbeatListener";
>  +    public String getProxyURL() { return proxyURL; }
>  +    public void setProxyURL(String proxyURL) { this.proxyURL = proxyURL; }
>  +
>      private CollectedInfo coll = null;
>
>      private Sender sender = null;
>  @@ -77,8 +86,18 @@
>          Object source = event.getLifecycle();
>          if (Lifecycle.PERIODIC_EVENT.equals(event.getType())) {
>              if (sender == null) {
>  -                sender = new MultiCastSender();
>  -                sender.init(this);
>  +                if (proxyList == null)
>  +                    sender = new MultiCastSender();
>  +                else
>  +                    sender = new TcpSender();
>  +
>  +                try {
>  +                    sender.init(this);
>  +                } catch (Exception ex) {
>  +                    log.error("Unable to initialize Sender: " + ex);
>  +                    sender = null;
>  +                    return;
>  +                }
>              }
>
>              /* Read busy and ready */
>
>  Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java
>  URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java?rev=763726&r1=763725&r2=763726&view=diff
>  ==============================================================================
>  --- tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java (original)
>  +++ tomcat/trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java Thu Apr  9 16:32:04 2009
>  @@ -40,7 +40,7 @@
>      MulticastSocket s = null;
>      InetAddress group = null;
>
>  -    public void init(HeartbeatListener config) {
>  +    public void init(HeartbeatListener config) throws Exception {
>          this.config = config;
>      }
>
>
>  Added: tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java
>  URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java?rev=763726&view=auto
>  ==============================================================================
>  --- tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java (added)
>  +++ tomcat/trunk/java/org/apache/catalina/ha/backend/Proxy.java Thu Apr  9 16:32:04 2009
>  @@ -0,0 +1,34 @@
>  +/*
>  + * Licensed to the Apache Software Foundation (ASF) under one or more
>  + * contributor license agreements.  See the NOTICE file distributed with
>  + * this work for additional information regarding copyright ownership.
>  + * The ASF licenses this file to You under the Apache License, Version 2.0
>  + * (the "License"); you may not use this file except in compliance with
>  + * the License.  You may obtain a copy of the License at
>  + *
>  + *      http://www.apache.org/licenses/LICENSE-2.0
>  + *
>  + * Unless required by applicable law or agreed to in writing, software
>  + * distributed under the License is distributed on an "AS IS" BASIS,
>  + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  + * See the License for the specific language governing permissions and
>  + * limitations under the License.
>  + */
>  +
>  +
>  +package org.apache.catalina.ha.backend;
>  +
>  +import java.net.InetAddress;
>  +
>  +/*
>  + * This class represents a front-end httpd server.
>  + *
>  + */
>  +public class Proxy {
>  +
>  +  protected enum State { OK, ERROR, DOWN };
>  +
>  +  public InetAddress address = null;
>  +  public int port = 80;
>  +  public State state = State.OK;
>  +}
>
>  Modified: tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java
>  URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java?rev=763726&r1=763725&r2=763726&view=diff
>  ==============================================================================
>  --- tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java (original)
>  +++ tomcat/trunk/java/org/apache/catalina/ha/backend/Sender.java Thu Apr  9 16:32:04 2009
>  @@ -27,7 +27,7 @@
>    /**
>     * Set the configuration parameters
>     */
>  -  public void init(HeartbeatListener config);
>  +  public void init(HeartbeatListener config) throws Exception;
>
>    /**
>     * Send the message to the proxies
>
>  Added: tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java
>  URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java?rev=763726&view=auto
>  ==============================================================================
>  --- tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java (added)
>  +++ tomcat/trunk/java/org/apache/catalina/ha/backend/TcpSender.java Thu Apr  9 16:32:04 2009
>  @@ -0,0 +1,196 @@
>  +/*
>  + * Licensed to the Apache Software Foundation (ASF) under one or more
>  + * contributor license agreements.  See the NOTICE file distributed with
>  + * this work for additional information regarding copyright ownership.
>  + * The ASF licenses this file to You under the Apache License, Version 2.0
>  + * (the "License"); you may not use this file except in compliance with
>  + * the License.  You may obtain a copy of the License at
>  + *
>  + *      http://www.apache.org/licenses/LICENSE-2.0
>  + *
>  + * Unless required by applicable law or agreed to in writing, software
>  + * distributed under the License is distributed on an "AS IS" BASIS,
>  + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  + * See the License for the specific language governing permissions and
>  + * limitations under the License.
>  + */
>  +
>  +
>  +package org.apache.catalina.ha.backend;
>  +
>  +import org.apache.juli.logging.Log;
>  +import org.apache.juli.logging.LogFactory;
>  +
>  +import java.io.BufferedReader;
>  +import java.io.BufferedWriter;
>  +import java.io.IOException;
>  +import java.io.InputStreamReader;
>  +import java.io.OutputStreamWriter;
>  +import java.net.InetAddress;
>  +import java.net.Socket;
>  +import java.net.InetAddress;
>  +import java.io.UnsupportedEncodingException;
>  +import java.util.StringTokenizer;
>  +
>  +/*
>  + * Sender to proxies using multicast socket.
>  + */
>  +public class TcpSender
>  +    implements Sender {
>  +
>  +    private static Log log = LogFactory.getLog(HeartbeatListener.class);
>  +
>  +    HeartbeatListener config = null;
>  +
>  +    /**
>  +     * Proxies.
>  +     */
>  +    protected Proxy[] proxies = null;
>  +
>  +
>  +    /**
>  +     * Active connections.
>  +     */
>  +
>  +    protected Socket[] connections = null;
>  +    protected BufferedReader[] connectionReaders = null;
>  +    protected BufferedWriter[] connectionWriters = null;
>  +
>  +
>  +    public void init(HeartbeatListener config) throws Exception {
>  +        this.config = config;
>  +        StringTokenizer tok = new StringTokenizer(config.getProxyList(), ",");
>  +        proxies = new Proxy[tok.countTokens()];
>  +        int i = 0;
>  +        while (tok.hasMoreTokens()) {
>  +            String token = tok.nextToken().trim();
>  +            int pos = token.indexOf(':');
>  +            if (pos <=0)
>  +                throw new Exception("bad ProxyList");
>  +            proxies[i] = new Proxy();
>  +            proxies[i].port = Integer.parseInt(token.substring(pos + 1));
>  +            try {
>  +                 proxies[i].address = InetAddress.getByName(token.substring(0, pos));
>  +            } catch (Exception e) {
>  +                throw new Exception("bad ProxyList");
>  +            }
>  +            i++;
>  +        }
>  +        connections = new Socket[proxies.length];
>  +        connectionReaders = new BufferedReader[proxies.length];
>  +        connectionWriters = new BufferedWriter[proxies.length];
>  +
>  +    }
>  +
>  +    public int send(String mess) throws Exception {
>  +        if (connections == null) {
>  +            log.error("Not initialized");
>  +            return -1;
>  +        }
>  +        String requestLine = "POST " + config.getProxyURL() + " HTTP/1.0";
>  +
>  +        for (int i = 0; i < connections.length; i++) {
>  +            if (connections[i] == null) {
>  +                try {
>  +                    connections[i] = new Socket(proxies[i].address, proxies[i].port);
>  +                    connectionReaders[i] = new BufferedReader(new InputStreamReader(connections[i].getInputStream()));
>  +                    connectionWriters[i] = new BufferedWriter(new OutputStreamWriter(connections[i].getOutputStream()));
>  +                } catch (Exception ex) {
>  +                    log.error("Unable to connect to proxy: " + ex);
>  +                    close(i);
>  +                }
>  +            }
>  +            if (connections[i] == null)
>  +                continue; // try next proxy in the list
>  +            BufferedWriter writer = connectionWriters[i];
>  +            try {
>  +                writer.write(requestLine);
>  +                writer.write("\r\n");
>  +                writer.write("Content-Length: " + mess.length() + "\r\n");
>  +                writer.write("User-Agent: HeartbeatListener/1.0\r\n");
>  +                writer.write("Connection: Keep-Alive\r\n");
>  +                writer.write("\r\n");
>  +                writer.write(mess);
>  +                writer.write("\r\n");
>  +                writer.flush();
>  +            } catch (Exception ex) {
>  +                log.error("Unable to send collected load information to proxy: " + ex);
>  +                close(i);
>  +            }
>  +            if (connections[i] == null)
>  +                continue; // try next proxy in the list
>  +
>  +            /* Read httpd answer */
>  +            String responseStatus = connectionReaders[i].readLine();
>  +            if (responseStatus == null) {
>  +                log.error("Unable to read response from proxy");
>  +                close(i);
>  +                continue;
>  +            } else {
>  +                responseStatus = responseStatus.substring(responseStatus.indexOf(' ') + 1, responseStatus.indexOf(' ', responseStatus.indexOf(' ') + 1));
>  +                int status = Integer.parseInt(responseStatus);
>  +                if (status != 200) {
>  +                    log.error("Status is " + status);
>  +                    close(i);
>  +                    continue;
>  +                }
>  +
>  +                // read all the headers.
>  +                String header = connectionReaders[i].readLine();
>  +                int contentLength = 0;
>  +                while (!"".equals(header)) {
>  +                    int colon = header.indexOf(':');
>  +                    String headerName = header.substring(0, colon).trim();
>  +                    String headerValue = header.substring(colon + 1).trim();
>  +                    if ("content-length".equalsIgnoreCase(headerName)) {
>  +                        contentLength = Integer.parseInt(headerValue);
>  +                    }
>  +                }
>  +                if (contentLength > 0) {
>  +                    char[] buf = new char[512];
>  +                    while (contentLength > 0) {
>  +                        int thisTime = (contentLength > buf.length) ? buf.length : contentLength;
>  +                        int n = connectionReaders[i].read(buf, 0, thisTime);
>  +                        if (n <= 0) {
>  +                            log.error("Read content failed");
>  +                            close(i);
>  +                            break;
>  +                        } else {
>  +                            contentLength -= n;
>  +                        }
>  +                   }
>  +                }
>  +            }
>  +
>  +        }
>  +
>  +        return 0;
>  +    }
>  +
>  +    /**
>  +     * Close connection.
>  +     */
>  +    protected void close(int i) {
>  +        try {
>  +            if (connectionReaders[i] != null) {
>  +                connectionReaders[i].close();
>  +            }
>  +        } catch (IOException e) {
>  +        }
>  +        connectionReaders[i] = null;
>  +        try {
>  +            if (connectionWriters[i] != null) {
>  +                connectionWriters[i].close();
>  +            }
>  +        } catch (IOException e) {
>  +        }
>  +        connectionWriters[i] = null;
>  +        try {
>  +            if (connections[i] != null) {
>  +                connections[i].close();
>  +            }
>  +        } catch (IOException e) {
>  +        }
>  +        connections[i] = null;
>  +    }
>  +}
>
>
>
>  ---------------------------------------------------------------------
>  To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
>  For additional commands, e-mail: dev-help@tomcat.apache.org
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org