You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/02/15 10:32:39 UTC

cvs commit: jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp ReplicationTransmitter.java

pero        2005/02/15 01:32:39

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        ReplicationTransmitter.java
  Log:
  Add dynamic property handling to replicationtransmitter to transfer attributes to senders
  Add autoReconnect and WaitForAck handling
  
  Revision  Changes    Path
  1.21      +157 -6    jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
  
  Index: ReplicationTransmitter.java
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- ReplicationTransmitter.java	27 Dec 2004 09:30:36 -0000	1.20
  +++ ReplicationTransmitter.java	15 Feb 2005 09:32:39 -0000	1.21
  @@ -16,7 +16,9 @@
   
   package org.apache.catalina.cluster.tcp;
   
  +import java.util.HashMap;
   import java.util.Iterator;
  +import java.util.Map;
   
   import javax.management.MBeanServer;
   import javax.management.ObjectName;
  @@ -26,12 +28,25 @@
   import org.apache.catalina.cluster.Member;
   import org.apache.catalina.cluster.io.XByteBuffer;
   import org.apache.catalina.util.StringManager;
  +import org.apache.tomcat.util.IntrospectionUtils;
   
  +
  +/**
  + * @author Peter Rossbach
  + * @author Filip Hanik
  + * @version 1.2
  + * 
  + */
   public class ReplicationTransmitter implements ClusterSender {
       private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
               .getLog(ReplicationTransmitter.class);
   
       /**
  +     * The descriptive information about this implementation.
  +     */
  +    private static final String info = "ReplicationTransmitter/1.2";
  +
  +    /**
        * The string manager for this package.
        */
       protected StringManager sm = StringManager.getManager(Constants.Package);
  @@ -49,10 +64,31 @@
   
       private long ackTimeout = 15000; //15 seconds by default
   
  +    private boolean waitForAck = true ;
  +    
       private SimpleTcpCluster cluster;
   
       private ObjectName objectName;
   
  +    private boolean autoConnect = true ;
  +
  +    private Map properties = new HashMap();
  +
  +    private long failureCounter = 0 ;
  +
  +    // ------------------------------------------------------------- Properties
  +
  +    /**
  +     * Return descriptive information about this implementation and the
  +     * corresponding version number, in the format
  +     * <code>&lt;description&gt;/&lt;version&gt;</code>.
  +     */
  +    public String getInfo() {
  +
  +        return (info);
  +
  +    }
  +
       private synchronized void addStats(int length) {
           nrOfRequests++;
           totalBytes += length;
  @@ -64,6 +100,16 @@
           }
   
       }
  +    
  +    /*
  +     * Reset sender statistics
  +     */
  +    public synchronized void resetStatistics() {
  +        nrOfRequests = 0;
  +        totalBytes = 0;
  +        failureCounter = 0;
  +    }
  +
       /**
        * @return Returns the nrOfRequests.
        */
  @@ -88,13 +134,14 @@
               throw new IllegalArgumentException(msg);
   
       }
  -
  +    
       public synchronized void add(Member member) {
           try {
               String key = getKey(member);
               if (!map.containsKey(key)) {
                   IDataSender sender = IDataSenderFactory.getIDataSender(
                           replicationMode, member);
  +                transferSenderProperty(sender);
                   map.put(key, sender);
                   registerSenderMBean(member, sender);
               }
  @@ -103,7 +150,19 @@
           }
       }//add
   
  -    private String getKey(Member member) {
  +    /**
  +     * Transfer all properties from transmitter to concrete sender
  +     * @param sender
  +     */
  +    protected void transferSenderProperty(IDataSender sender) {
  +        for (Iterator iter = getPropertyNames(); iter.hasNext();) {
  +            String pkey = (String) iter.next();
  +            Object value = getProperty(pkey);
  +            IntrospectionUtils.setProperty(sender, pkey, value.toString());                    
  +        }
  +    }
  +
  +    protected String getKey(Member member) {
           return member.getHost() + ":" + member.getPort();
       }
   
  @@ -234,16 +293,28 @@
           return result;
       }
   
  +    /**
  +     * Send message to concrete sender. If autoConnect is true, check is connection broken 
  +     * and the reconnect the complete sender.
  +     * <ul>
  +     * <li>failure the suspect flag is set true. After successfully
  +     * sending the suspect flag is set to false.</li>
  +     * <li>Stats is only update after sussesfull sending</li>
  +     * </ul>
  +     * 
  +     * @param sessionId Unique Message Id
  +     * @param data message Data
  +     * @param sender concrete message sender
  +     * @throws java.io.IOException
  +     */
       protected void sendMessageData(String sessionId, byte[] data,
               IDataSender sender) throws java.io.IOException {
           if (sender == null)
               throw new java.io.IOException(
                       "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
           try {
  -            if (!sender.isConnected())
  +            if (autoConnect  && !sender.isConnected())
                   sender.connect();
  -            //set the timeout, will be ignored by async senders
  -            sender.setAckTimeout(getAckTimeout());
               sender.sendMessage(sessionId, data);
               sender.setSuspect(false);
               addStats(data.length);
  @@ -257,7 +328,7 @@
                   }
               }
               sender.setSuspect(true);
  -
  +            failureCounter++ ;
           }
   
       }
  @@ -293,19 +364,53 @@
           return replicationMode;
       }
   
  +    /**
  +     * @return
  +     * @deprecated since Version 1.1
  +     */
       public boolean getIsSenderSynchronized() {
           return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
                   || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
       }
   
  +    /**
  +     * @return Returns the autoConnect.
  +     */
  +    public boolean isAutoConnect() {
  +        return autoConnect;
  +    }
  +    /**
  +     * @param autoConnect The autoConnect to set.
  +     */
  +    public void setAutoConnect(boolean autoConnect) {
  +        this.autoConnect = autoConnect;
  +        setProperty("autoConnect", String.valueOf(autoConnect));
  +        
  +    }
  + 
       public long getAckTimeout() {
           return ackTimeout;
       }
   
       public void setAckTimeout(long ackTimeout) {
           this.ackTimeout = ackTimeout;
  +        setProperty("ackTimeout", String.valueOf(ackTimeout));
       }
   
  +    /**
  +     * @return Returns the waitForAck.
  +     */
  +    public boolean isWaitForAck() {
  +        return waitForAck;
  +    }
  +    /**
  +     * @param waitForAck The waitForAck to set.
  +     */
  +    public void setWaitForAck(boolean waitForAck) {
  +        this.waitForAck = waitForAck;
  +        setProperty("waitForAck", String.valueOf(waitForAck));
  +   }
  +    
       /*
        * (non-Javadoc)
        * 
  @@ -316,4 +421,50 @@
   
       }
   
  +    /** 
  +     * set config attributes with reflect 
  +     * @param name
  +     * @param value
  +     */
  +    public void setProperty( String name, Object value ) {
  +        if( log.isTraceEnabled())
  +            log.trace(sm.getString("ReplicationTransmitter.setProperty", name, value));
  +
  +        properties.put(name, value);
  +    }
  +
  +    /**
  +     * get current config
  +     * @param key
  +     * @return
  +     */
  +    public Object getProperty( String key ) {
  +        if( log.isTraceEnabled())
  +            log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
  +        return properties.get(key);
  +    }
  +
  +    /**
  +     * Get all properties keys
  +     * @return
  +     */
  +    public Iterator getPropertyNames() {
  +        return properties.keySet().iterator();
  +    }
  +
  +    
  +    /** 
  +     * remove a configured property.
  +     * @param key
  +     */
  +    public void removeProperty(String key) {
  +        properties.remove(key);
  +    }
  +    
  +    /**
  +     * @return Returns the failureCounter.
  +     */
  +    public long getFailureCounter() {
  +        return failureCounter;
  +    }
   }
  \ No newline at end of file
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: tomcat-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: tomcat-dev-help@jakarta.apache.org