You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/03/25 23:12:32 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java

pero        2005/03/25 14:12:32

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationTransmitter.java
  Log:
  Refactor Code
  Add compress transfer handling
  Add api docs
  
  Revision  Changes    Path
  1.22      +444 -250  jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
  
  Index: ReplicationTransmitter.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
  retrieving revision 1.21
  retrieving revision 1.22
  diff -u -r1.21 -r1.22
  --- ReplicationTransmitter.java	15 Feb 2005 09:32:39 -0000	1.21
  +++ ReplicationTransmitter.java	25 Mar 2005 22:12:31 -0000	1.22
  @@ -16,6 +16,7 @@
   
   package org.apache.catalina.cluster.tcp;
   
  +import java.io.IOException;
   import java.util.HashMap;
   import java.util.Iterator;
   import java.util.Map;
  @@ -24,18 +25,22 @@
   import javax.management.ObjectName;
   
   import org.apache.catalina.cluster.ClusterSender;
  -import org.apache.catalina.cluster.Constants;
   import org.apache.catalina.cluster.Member;
   import org.apache.catalina.cluster.io.XByteBuffer;
   import org.apache.catalina.util.StringManager;
   import org.apache.tomcat.util.IntrospectionUtils;
   
  -
   /**
  + * Transmit message to ohter cluster members create sender from replicationMode
  + * type 
  + * FIXME i18n log messages
  + * FIXME compress data depends on message type and size 
  + * FIXME send very big messages at some block see FarmWarDeployer!
  + * TODO pause and resume senders
  + * 
    * @author Peter Rossbach
    * @author Filip Hanik
  - * @version 1.2
  - * 
  + * @version $Revision$ $Date$
    */
   public class ReplicationTransmitter implements ClusterSender {
       private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
  @@ -44,37 +49,69 @@
       /**
        * The descriptive information about this implementation.
        */
  -    private static final String info = "ReplicationTransmitter/1.2";
  +    private static final String info = "ReplicationTransmitter/1.3";
   
       /**
        * The string manager for this package.
        */
       protected StringManager sm = StringManager.getManager(Constants.Package);
   
  -    private java.util.HashMap map = new java.util.HashMap();
  +    private Map map = new HashMap();
   
       public ReplicationTransmitter() {
       }
   
  +    /**
  +     * number of transmitted messages>
  +     */
       private long nrOfRequests = 0;
   
  +    /**
  +     * number of transmitted bytes
  +     */
       private long totalBytes = 0;
   
  +    private long failureCounter = 0;
  +
  +    /**
  +     * current sender replication mode
  +     */
       private String replicationMode;
   
  +    /**
  +     * sender default ackTimeout
  +     */
       private long ackTimeout = 15000; //15 seconds by default
   
  -    private boolean waitForAck = true ;
  -    
  -    private SimpleTcpCluster cluster;
  +    /**
  +     * enabled wait for ack
  +     */
  +    private boolean waitForAck = true;
   
  -    private ObjectName objectName;
  +    /**
  +     * autoConnect sender when next message send
  +     */
  +    private boolean autoConnect = true;
   
  -    private boolean autoConnect = true ;
  +    /**
  +     * Compress message data bytes
  +     */
  +    private boolean compress = true;
   
  +    /**
  +     * dynamic sender <code>properties</code>
  +     */
       private Map properties = new HashMap();
   
  -    private long failureCounter = 0 ;
  +    /**
  +     * my cluster
  +     */
  +    private SimpleTcpCluster cluster;
  +
  +    /**
  +     * Transmitter Mbean name
  +     */
  +    private ObjectName objectName;
   
       // ------------------------------------------------------------- Properties
   
  @@ -89,27 +126,6 @@
   
       }
   
  -    private synchronized void addStats(int length) {
  -        nrOfRequests++;
  -        totalBytes += length;
  -        if (log.isDebugEnabled() &&
  -           (nrOfRequests % 100) == 0) {
  -                log.debug("Nr of bytes sent=" + totalBytes + " over "
  -                        + nrOfRequests + " ==" + (totalBytes / nrOfRequests)
  -                        + " bytes/request");
  -        }
  -
  -    }
  -    
  -    /*
  -     * Reset sender statistics
  -     */
  -    public synchronized void resetStatistics() {
  -        nrOfRequests = 0;
  -        totalBytes = 0;
  -        failureCounter = 0;
  -    }
  -
       /**
        * @return Returns the nrOfRequests.
        */
  @@ -124,6 +140,28 @@
           return totalBytes;
       }
   
  +    /**
  +     * @return Returns the failureCounter.
  +     */
  +    public long getFailureCounter() {
  +        return failureCounter;
  +    }
  +
  +    /**
  +     * current replication mode
  +     * 
  +     * @return
  +     */
  +    public String getReplicationMode() {
  +        return replicationMode;
  +    }
  +
  +    /**
  +     * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
  +     * 
  +     * @see IDataSenderFactory#validateMode(String)
  +     * @param mode
  +     */
       public void setReplicationMode(String mode) {
           String msg = IDataSenderFactory.validateMode(mode);
           if (msg == null) {
  @@ -134,94 +172,195 @@
               throw new IllegalArgumentException(msg);
   
       }
  -    
  -    public synchronized void add(Member member) {
  -        try {
  -            String key = getKey(member);
  -            if (!map.containsKey(key)) {
  -                IDataSender sender = IDataSenderFactory.getIDataSender(
  -                        replicationMode, member);
  -                transferSenderProperty(sender);
  -                map.put(key, sender);
  -                registerSenderMBean(member, sender);
  -            }
  -        } catch (java.io.IOException x) {
  -            log.error("Unable to create and add a IDataSender object.", x);
  -        }
  -    }//add
   
       /**
  -     * Transfer all properties from transmitter to concrete sender
  -     * @param sender
  +     * Transmitter ObjectName
  +     * 
  +     * @param name
        */
  -    protected void transferSenderProperty(IDataSender sender) {
  -        for (Iterator iter = getPropertyNames(); iter.hasNext();) {
  -            String pkey = (String) iter.next();
  -            Object value = getProperty(pkey);
  -            IntrospectionUtils.setProperty(sender, pkey, value.toString());                    
  -        }
  +    public void setObjectName(ObjectName name) {
  +        objectName = name;
       }
   
  -    protected String getKey(Member member) {
  -        return member.getHost() + ":" + member.getPort();
  +    public ObjectName getObjectName() {
  +        return objectName;
       }
   
  -    public synchronized void remove(Member member) {
  -        String key = getKey(member);
  -        IDataSender toberemoved = (IDataSender) map.get(key);
  -        if (toberemoved == null)
  -            return;
  -        unregisterSenderMBean(toberemoved);
  -        toberemoved.disconnect();
  -        map.remove(key);
  -       
  +    /**
  +     * @return Returns the compress.
  +     */
  +    public boolean isCompress() {
  +        return compress;
       }
   
  -    protected void unregisterSenderMBean(IDataSender sender) {
  -        try {
  -            MBeanServer mserver = cluster.getMBeanServer();
  -            if (mserver != null) {
  -                mserver.unregisterMBean(getSenderObjectName(sender));
  -            }
  -        } catch (Exception e) {
  -            log.warn(e);
  -        }
  +    /**
  +     * @param compress
  +     *            The compress to set.
  +     */
  +    public void setCompress(boolean compressMessageData) {
  +        this.compress = compressMessageData;
       }
   
  -    protected void registerSenderMBean(Member member, IDataSender sender) {
  -        if (member != null && cluster != null) {
  -            try {
  -                MBeanServer mserver = cluster.getMBeanServer();
  -                ObjectName senderName = getSenderObjectName(sender);
  -                if (mserver.isRegistered(senderName)) {
  -                    if (log.isWarnEnabled())
  -                        log.warn(sm.getString(
  -                                "cluster.mbean.register.allready", senderName));
  -                    return;
  -                }
  -                mserver.registerMBean(cluster.getManagedBean(sender),
  -                        senderName);
  -            } catch (Exception e) {
  -                log.warn(e);
  -            }
  -        }
  +    /**
  +     * @return Returns the autoConnect.
  +     */
  +    public boolean isAutoConnect() {
  +        return autoConnect;
       }
   
  -    protected ObjectName getSenderObjectName(IDataSender sender) {
  -        ObjectName senderName = null;
  -        try {
  -            ObjectName clusterName = cluster.getObjectName();
  -            MBeanServer mserver = cluster.getMBeanServer();
  -            senderName = new ObjectName(clusterName.getDomain()
  -                    + ":type=IDataSender,host="
  -                    + clusterName.getKeyProperty("host") + ",senderAddress="
  -                    + sender.getAddress().getHostAddress() + ",senderPort=" + sender.getPort());
  -        } catch (Exception e) {
  -            log.warn(e);
  +    /**
  +     * @param autoConnect
  +     *            The autoConnect to set.
  +     */
  +    public void setAutoConnect(boolean autoConnect) {
  +        this.autoConnect = autoConnect;
  +        setProperty("autoConnect", String.valueOf(autoConnect));
  +
  +    }
  +
  +    /**
  +     * @return
  +     */
  +    public long getAckTimeout() {
  +        return ackTimeout;
  +    }
  +
  +    /**
  +     * @param ackTimeout
  +     */
  +    public void setAckTimeout(long ackTimeout) {
  +        this.ackTimeout = ackTimeout;
  +        setProperty("ackTimeout", String.valueOf(ackTimeout));
  +    }
  +
  +    /**
  +     * @return Returns the waitForAck.
  +     */
  +    public boolean isWaitForAck() {
  +        return waitForAck;
  +    }
  +
  +    /**
  +     * @param waitForAck
  +     *            The waitForAck to set.
  +     */
  +    public void setWaitForAck(boolean waitForAck) {
  +        this.waitForAck = waitForAck;
  +        setProperty("waitForAck", String.valueOf(waitForAck));
  +    }
  +
  +    /*
  +     * configured in cluster
  +     * 
  +     * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
  +     */
  +    public void setCatalinaCluster(SimpleTcpCluster cluster) {
  +        this.cluster = cluster;
  +
  +    }
  +
  +    /**
  +     * @return
  +     * @deprecated since version 5.5.7
  +     */
  +    public boolean getIsSenderSynchronized() {
  +        return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
  +                || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
  +    }
  +
  +    // ------------------------------------------------------------- dynamic
  +    // sender property handling
  +
  +    /**
  +     * set config attributes with reflect
  +     * 
  +     * @param name
  +     * @param value
  +     */
  +    public void setProperty(String name, Object value) {
  +        if (log.isTraceEnabled())
  +            log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
  +                    value));
  +
  +        properties.put(name, value);
  +    }
  +
  +    /**
  +     * get current config
  +     * 
  +     * @param key
  +     * @return
  +     */
  +    public Object getProperty(String key) {
  +        if (log.isTraceEnabled())
  +            log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
  +        return properties.get(key);
  +    }
  +
  +    /**
  +     * Get all properties keys
  +     * 
  +     * @return
  +     */
  +    public Iterator getPropertyNames() {
  +        return properties.keySet().iterator();
  +    }
  +
  +    /**
  +     * remove a configured property.
  +     * 
  +     * @param key
  +     */
  +    public void removeProperty(String key) {
  +        properties.remove(key);
  +    }
  +
  +    // ------------------------------------------------------------- public
  +
  +    /**
  +     * Send data to one member
  +     * 
  +     * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
  +     *      byte[], org.apache.catalina.cluster.Member)
  +     */
  +    public void sendMessage(String sessionId, byte[] indata, Member member)
  +            throws java.io.IOException {
  +        byte[] data = convertSenderData(indata);
  +        String key = getKey(member);
  +        IDataSender sender = (IDataSender) map.get(key);
  +        sendMessageData(sessionId, data, sender);
  +    }
  +
  +    /**
  +     * send message to all senders (broadcast)
  +     * 
  +     * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
  +     *      byte[])
  +     */
  +    public void sendMessage(String sessionId, byte[] indata)
  +            throws java.io.IOException {
  +        IDataSender[] senders = getSenders();
  +        byte[] data = convertSenderData(indata);
  +        for (int i = 0; i < senders.length; i++) {
  +
  +            IDataSender sender = senders[i];
  +            try {
  +                sendMessageData(sessionId, data, sender);
  +            } catch (Exception x) {
  +
  +                if (!sender.getSuspect())
  +                    log.warn("Unable to send replicated message to " + sender
  +                            + ", is server down?", x);
  +                sender.setSuspect(true);
  +            }
           }
  -        return senderName;
       }
   
  +    /**
  +     * start the sender and register transmitter mbean
  +     * 
  +     * @see org.apache.catalina.cluster.ClusterSender#start()
  +     */
       public void start() throws java.io.IOException {
           if (cluster != null) {
               ObjectName clusterName = cluster.getObjectName();
  @@ -248,14 +387,11 @@
   
       }
   
  -    public void setObjectName(ObjectName name) {
  -        objectName = name;
  -    }
  -
  -    public ObjectName getObjectName() {
  -        return objectName;
  -    }
  -
  +    /*
  +     * stop the sender and deregister mbeans (transmitter, senders)
  +     * 
  +     * @see org.apache.catalina.cluster.ClusterSender#stop()
  +     */
       public synchronized void stop() {
           Iterator i = map.entrySet().iterator();
           while (i.hasNext()) {
  @@ -279,192 +415,250 @@
   
       }
   
  +    /**
  +     * get all current senders
  +     * 
  +     * @return
  +     */
       public IDataSender[] getSenders() {
  -        java.util.Iterator i = map.entrySet().iterator();
  -        java.util.Vector v = new java.util.Vector();
  -        while (i.hasNext()) {
  -            IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
  -                    .getValue();
  +        java.util.Iterator iter = map.entrySet().iterator();
  +        IDataSender[] array = new IDataSender[map.size()];
  +        int i = 0;
  +        while (iter.hasNext()) {
  +            IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
  +                    .next()).getValue();
               if (sender != null)
  -                v.addElement(sender);
  +                array[i] = sender;
  +            i++;
           }
  -        IDataSender[] result = new IDataSender[v.size()];
  -        v.copyInto(result);
  -        return result;
  +        return array;
       }
   
       /**
  -     * Send message to concrete sender. If autoConnect is true, check is connection broken 
  -     * and the reconnect the complete sender.
  -     * <ul>
  -     * <li>failure the suspect flag is set true. After successfully
  -     * sending the suspect flag is set to false.</li>
  -     * <li>Stats is only update after sussesfull sending</li>
  -     * </ul>
  +     * get all current senders
        * 
  -     * @param sessionId Unique Message Id
  -     * @param data message Data
  -     * @param sender concrete message sender
  -     * @throws java.io.IOException
  +     * @return
        */
  -    protected void sendMessageData(String sessionId, byte[] data,
  -            IDataSender sender) throws java.io.IOException {
  -        if (sender == null)
  -            throw new java.io.IOException(
  -                    "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
  -        try {
  -            if (autoConnect  && !sender.isConnected())
  -                sender.connect();
  -            sender.sendMessage(sessionId, data);
  -            sender.setSuspect(false);
  -            addStats(data.length);
  -        } catch (Exception x) {
  -            if (log.isWarnEnabled()) {
  -                if (!sender.getSuspect()) {
  -                    log
  -                            .warn(
  -                                    "Unable to send replicated message, is server down?",
  -                                    x);
  -                }
  -            }
  -            sender.setSuspect(true);
  -            failureCounter++ ;
  +    public ObjectName[] getSenderObjectNames() {
  +        java.util.Iterator iter = map.entrySet().iterator();
  +        ObjectName array[] = new ObjectName[map.size()];
  +        int i = 0;
  +        while (iter.hasNext()) {
  +            IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
  +                    .next()).getValue();
  +            if (sender != null)
  +                array[i] = getSenderObjectName(sender);
  +            i++;
           }
  -
  +        return array;
       }
   
  -    public void sendMessage(String sessionId, byte[] indata, Member member)
  -            throws java.io.IOException {
  -        byte[] data = XByteBuffer.createDataPackage(indata);
  -        String key = getKey(member);
  -        IDataSender sender = (IDataSender) map.get(key);
  -        sendMessageData(sessionId, data, sender);
  +    /*
  +     * Reset sender statistics
  +     */
  +    public synchronized void resetStatistics() {
  +        nrOfRequests = 0;
  +        totalBytes = 0;
  +        failureCounter = 0;
       }
   
  -    public void sendMessage(String sessionId, byte[] indata)
  -            throws java.io.IOException {
  -        IDataSender[] senders = getSenders();
  -        byte[] data = XByteBuffer.createDataPackage(indata);
  -        for (int i = 0; i < senders.length; i++) {
  -
  -            IDataSender sender = senders[i];
  -            try {
  -                sendMessageData(sessionId, data, sender);
  -            } catch (Exception x) {
  -
  -                if (!sender.getSuspect())
  -                    log.warn("Unable to send replicated message to " + sender
  -                            + ", is server down?", x);
  -                sender.setSuspect(true);
  +    /*
  +     * add new cluster member and create sender ( s. replicationMode) transfer
  +     * current properties to sender
  +     * 
  +     * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member)
  +     */
  +    public synchronized void add(Member member) {
  +        try {
  +            String key = getKey(member);
  +            if (!map.containsKey(key)) {
  +                IDataSender sender = IDataSenderFactory.getIDataSender(
  +                        replicationMode, member);
  +                transferSenderProperty(sender);
  +                map.put(key, sender);
  +                registerSenderMBean(member, sender);
               }
  -        }//while
  -    }
  -
  -    public String getReplicationMode() {
  -        return replicationMode;
  +        } catch (java.io.IOException x) {
  +            log.error("Unable to create and add a IDataSender object.", x);
  +        }
       }
   
       /**
  -     * @return
  -     * @deprecated since Version 1.1
  +     * remove sender from transmitter. ( deregister mbean and disconnect sender )
  +     * 
  +     * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member)
        */
  -    public boolean getIsSenderSynchronized() {
  -        return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
  -                || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
  +    public synchronized void remove(Member member) {
  +        String key = getKey(member);
  +        IDataSender toberemoved = (IDataSender) map.get(key);
  +        if (toberemoved == null)
  +            return;
  +        unregisterSenderMBean(toberemoved);
  +        toberemoved.disconnect();
  +        map.remove(key);
  +
       }
   
  +    // ------------------------------------------------------------- protected
  +
       /**
  -     * @return Returns the autoConnect.
  +     * calc number of requests and transfered bytes. Log stats all 100 requets
  +     * 
  +     * @param length
        */
  -    public boolean isAutoConnect() {
  -        return autoConnect;
  +    protected synchronized void addStats(int length) {
  +        nrOfRequests++;
  +        totalBytes += length;
  +        if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
  +            log.debug("Nr of bytes sent=" + totalBytes + " over "
  +                    + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
  +                    + " bytes/request; failures=" + failureCounter);
  +        }
  +
       }
  +
       /**
  -     * @param autoConnect The autoConnect to set.
  +     * Transfer all properties from transmitter to concrete sender
  +     * 
  +     * @param sender
        */
  -    public void setAutoConnect(boolean autoConnect) {
  -        this.autoConnect = autoConnect;
  -        setProperty("autoConnect", String.valueOf(autoConnect));
  -        
  -    }
  - 
  -    public long getAckTimeout() {
  -        return ackTimeout;
  -    }
  -
  -    public void setAckTimeout(long ackTimeout) {
  -        this.ackTimeout = ackTimeout;
  -        setProperty("ackTimeout", String.valueOf(ackTimeout));
  +    protected void transferSenderProperty(IDataSender sender) {
  +        for (Iterator iter = getPropertyNames(); iter.hasNext();) {
  +            String pkey = (String) iter.next();
  +            Object value = getProperty(pkey);
  +            IntrospectionUtils.setProperty(sender, pkey, value.toString());
  +        }
       }
   
       /**
  -     * @return Returns the waitForAck.
  +     * set unique key to find sender
  +     * 
  +     * @param member
  +     * @return concat member.host:member.port
        */
  -    public boolean isWaitForAck() {
  -        return waitForAck;
  +    protected String getKey(Member member) {
  +        return member.getHost() + ":" + member.getPort();
       }
  +
       /**
  -     * @param waitForAck The waitForAck to set.
  -     */
  -    public void setWaitForAck(boolean waitForAck) {
  -        this.waitForAck = waitForAck;
  -        setProperty("waitForAck", String.valueOf(waitForAck));
  -   }
  -    
  -    /*
  -     * (non-Javadoc)
  +     * unregsister sendern Mbean
        * 
  -     * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
  +     * @see #getSenderObjectName(IDataSender)
  +     * @param sender
        */
  -    public void setCatalinaCluster(SimpleTcpCluster cluster) {
  -        this.cluster = cluster;
  -
  +    protected void unregisterSenderMBean(IDataSender sender) {
  +        try {
  +            MBeanServer mserver = cluster.getMBeanServer();
  +            if (mserver != null) {
  +                mserver.unregisterMBean(getSenderObjectName(sender));
  +            }
  +        } catch (Exception e) {
  +            log.warn(e);
  +        }
       }
   
  -    /** 
  -     * set config attributes with reflect 
  -     * @param name
  -     * @param value
  +    /**
  +     * register MBean and check it exist (big problem!)
  +     * 
  +     * @param member
  +     * @param sender
        */
  -    public void setProperty( String name, Object value ) {
  -        if( log.isTraceEnabled())
  -            log.trace(sm.getString("ReplicationTransmitter.setProperty", name, value));
  -
  -        properties.put(name, value);
  +    protected void registerSenderMBean(Member member, IDataSender sender) {
  +        if (member != null && cluster != null) {
  +            try {
  +                MBeanServer mserver = cluster.getMBeanServer();
  +                ObjectName senderName = getSenderObjectName(sender);
  +                if (mserver.isRegistered(senderName)) {
  +                    if (log.isWarnEnabled())
  +                        log.warn(sm.getString(
  +                                "cluster.mbean.register.allready", senderName));
  +                    return;
  +                }
  +                mserver.registerMBean(cluster.getManagedBean(sender),
  +                        senderName);
  +            } catch (Exception e) {
  +                log.warn(e);
  +            }
  +        }
       }
   
       /**
  -     * get current config
  -     * @param key
  +     * build sender ObjectName (
  +     * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" )
  +     * 
  +     * @param sender
        * @return
        */
  -    public Object getProperty( String key ) {
  -        if( log.isTraceEnabled())
  -            log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
  -        return properties.get(key);
  +    protected ObjectName getSenderObjectName(IDataSender sender) {
  +        ObjectName senderName = null;
  +        try {
  +            ObjectName clusterName = cluster.getObjectName();
  +            MBeanServer mserver = cluster.getMBeanServer();
  +            senderName = new ObjectName(clusterName.getDomain()
  +                    + ":type=IDataSender,host="
  +                    + clusterName.getKeyProperty("host") + ",senderAddress="
  +                    + sender.getAddress().getHostAddress() + ",senderPort="
  +                    + sender.getPort());
  +        } catch (Exception e) {
  +            log.warn(e);
  +        }
  +        return senderName;
       }
   
       /**
  -     * Get all properties keys
  +     * compress data
  +     * 
  +     * @see XByteBuffer#createDataPackage(byte[])
  +     * @param indata
        * @return
  +     * @throws IOException
  +     *             FIXME get CompressMessageDate from cluster instanz
        */
  -    public Iterator getPropertyNames() {
  -        return properties.keySet().iterator();
  +    protected byte[] convertSenderData(byte[] data) throws IOException {
  +        return XByteBuffer.createDataPackage(data, isCompress());
       }
   
  -    
  -    /** 
  -     * remove a configured property.
  -     * @param key
  -     */
  -    public void removeProperty(String key) {
  -        properties.remove(key);
  -    }
  -    
       /**
  -     * @return Returns the failureCounter.
  +     * Send message to concrete sender. If autoConnect is true, check is
  +     * connection broken and the reconnect the complete sender.
  +     * <ul>
  +     * <li>failure the suspect flag is set true. After successfully sending the
  +     * suspect flag is set to false.</li>
  +     * <li>Stats is only update after sussesfull sending</li>
  +     * </ul>
  +     * 
  +     * @param sessionId
  +     *            Unique Message Id
  +     * @param data
  +     *            message Data
  +     * @param sender
  +     *            concrete message sender
  +     * @throws java.io.IOException
        */
  -    public long getFailureCounter() {
  -        return failureCounter;
  +    protected void sendMessageData(String sessionId, byte[] data,
  +            IDataSender sender) throws java.io.IOException {
  +        if (sender == null)
  +            throw new java.io.IOException(
  +                    "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
  +        try {
  +            if (autoConnect && !sender.isConnected())
  +                sender.connect();
  +            sender.sendMessage(sessionId, data);
  +            sender.setSuspect(false);
  +            addStats(data.length);
  +        } catch (Exception x) {
  +            if (log.isWarnEnabled()) {
  +                if (!sender.getSuspect()) {
  +                    log
  +                            .warn(
  +                                    "Unable to send replicated message, is server down?",
  +                                    x);
  +                }
  +            }
  +            sender.setSuspect(true);
  +            failureCounter++;
  +        }
  +
       }
  +
   }
  \ No newline at end of file
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org