You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by pe...@apache.org on 2005/11/18 22:07:36 UTC

svn commit: r345567 - in /tomcat/container/tc5.5.x: modules/cluster/src/share/org/apache/catalina/cluster/tcp/ modules/cluster/src/share/org/apache/catalina/cluster/util/ webapps/docs/

Author: pero
Date: Fri Nov 18 13:07:23 2005
New Revision: 345567

URL: http://svn.apache.org/viewcvs?rev=345567&view=rev
Log:
Fix that sendMessage signature at all DataSender subclasses must be changed.
Fix that socket at o.a.c.cluster.tcp.FastAsyncSocketSender can be disconnect/connect

Modified:
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java
    tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java
    tomcat/container/tc5.5.x/webapps/docs/changelog.xml

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/AsyncSocketSender.java Fri Nov 18 13:07:23 2005
@@ -152,23 +152,23 @@
         super.disconnect();
     }
 
-    /*
+    /**
      * Send message to queue for later sending
      * 
-     * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(java.lang.String,
-     *      byte[])
+     * @see org.apache.catalina.cluster.tcp.DataSender#pushMessage(ClusterData)
      */
-    public void sendMessage(String messageid, ClusterData data)
+    public void sendMessage(ClusterData data)
             throws java.io.IOException {
-        SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(messageid, data);
+        SmartQueue.SmartEntry entry = new SmartQueue.SmartEntry(data.getUniqueId(), data);
         queue.add(entry);
         synchronized (this) {
             inQueueCounter++;
-            queueThread.incQueuedNrOfBytes(data.getMessage().length);
+            if(queueThread != null)
+                queueThread.incQueuedNrOfBytes(data.getMessage().length);
        }
         if (log.isTraceEnabled())
             log.trace(sm.getString("AsyncSocketSender.queue.message",
-                    getAddress().getHostAddress(), new Integer(getPort()), messageid, new Long(
+                    getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
                             data.getMessage().length)));
     }
 

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/DataSender.java Fri Nov 18 13:07:23 2005
@@ -666,6 +666,11 @@
        if(isConnected())
            return ;
        try {
+           throw new Exception("open socket") ;
+       } catch ( Exception ex2) {
+           log.debug("open socket exception",ex2);
+       }
+       try {
             createSocket();
             if (isWaitForAck())
                 socket.setSoTimeout((int) ackTimeout);

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/FastAsyncSocketSender.java Fri Nov 18 13:07:23 2005
@@ -273,15 +273,16 @@
 
     // --------------------------------------------------------- Public Methods
 
-    /*
-     * Connect to socket and start background thread to ppush queued messages
+    /**
+     * Connect to socket and start background thread to push queued messages
      * 
      * @see org.apache.catalina.cluster.tcp.IDataSender#connect()
      */
     public void connect() throws java.io.IOException {
         super.connect();
         checkThread();
-        queue.start() ;
+        if(!queue.isEnabled())
+            queue.start() ;
     }
 
     /**
@@ -291,25 +292,30 @@
      */
     public void disconnect() {
         stopThread();
+        // delete "remove" lock at queue
         queue.stop() ;
+        // enable that sendMessage can add new messages
+        queue.start() ;
+        // close socket
         super.disconnect();
     }
 
     /**
      * Send message to queue for later sending.
      * 
-     * @see org.apache.catalina.cluster.tcp.IDataSender#sendMessage(ClusterData)
+     * @see org.apache.catalina.cluster.tcp.DataSender#pushMessage(ClusterData)
      */
-    public void sendMessage(String messageid, ClusterData data)
+    public void sendMessage(ClusterData data)
             throws java.io.IOException {
-        queue.add(messageid, data);
+        queue.add(data.getUniqueId(), data);
         synchronized (this) {
             inQueueCounter++;
-            queueThread.incQueuedNrOfBytes(data.getMessage().length);
-       }
-       if (log.isTraceEnabled())
+            if(queueThread != null)
+                queueThread.incQueuedNrOfBytes(data.getMessage().length);
+        }
+        if (log.isTraceEnabled())
             log.trace(sm.getString("AsyncSocketSender.queue.message",
-                    getAddress().getHostAddress(), new Integer(getPort()), messageid, new Long(
+                    getAddress().getHostAddress(), new Integer(getPort()), data.getUniqueId(), new Long(
                             data.getMessage().length)));
     }
 
@@ -417,17 +423,20 @@
         }
 
 
-        public void stopRunning() {
+        /**
+         * Stop backend thread!
+         */
+         public void stopRunning() {
             keepRunning = false;
         }
         
         
-        /* Get the objects from queue and send all mesages to the sender.
+        /**
+         * Get the objects from queue and send all mesages to the sender.
          * @see java.lang.Runnable#run()
          */
         public void run() {
             while (keepRunning) {
-                long queueSize;
                 LinkObject entry = getQueuedMessage();
                 if (entry != null) {
                     pushQueuedMessages(entry);
@@ -442,7 +451,8 @@
         }
 
         /**
-         * @return
+         * Get List of queue cluster messages
+         * @return list of cluster messages
          */
         protected LinkObject getQueuedMessage() {
             // get a link list of all queued objects

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java Fri Nov 18 13:07:23 2005
@@ -90,32 +90,32 @@
 
     //  ----------------------------------------------------- Public Methode
 
-    public void connect() throws java.io.IOException {
+    public synchronized void connect() throws java.io.IOException {
         //do nothing, happens in the socket sender itself
         senderQueue.open();
         setSocketConnected(true);
         connectCounter++;
     }
 
-    public void disconnect() {
+    public synchronized void disconnect() {
         senderQueue.close();
         setSocketConnected(false);
         disconnectCounter++;
     }
 
     /**
-     * send Message and use a pool of SocketSenders
+     * send message and use a pool of SocketSenders
      * 
      * @param messageId Message unique identifier
      * @param data Message data
      * @throws java.io.IOException
      */
-    public void sendMessage(String messageId, ClusterData data) throws IOException {
+    public void sendMessage(ClusterData data) throws IOException {
         //get a socket sender from the pool
         if(!isConnected()) {
             synchronized(this) {
-            if(!isConnected())
-                connect();
+                if(!isConnected())
+                    connect();
             }
         }
         SocketSender sender = senderQueue.getSender(0);

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java Fri Nov 18 13:07:23 2005
@@ -822,7 +822,6 @@
         // FIXME add Stats how much comress and uncompress messages and bytes are transfered
         if ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN)
                 || msg.getCompress() == ClusterMessage.FLAG_ALLOWED) {
-            System.out.println("msg compress: " + msg.getUniqueId() );
             gout = new GZIPOutputStream(outs);
             out = new ObjectOutputStream(gout);
         } else {

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/FastQueue.java Fri Nov 18 13:07:23 2005
@@ -65,19 +65,10 @@
      */
     private boolean doStats = false;
 
-    /**
-     *  
-     */
     private boolean inAdd = false;
 
-    /**
-     *  
-     */
     private boolean inRemove = false;
 
-    /**
-     *  
-     */
     private boolean inMutex = false;
 
     /**
@@ -141,14 +132,8 @@
      */
     private long avgSize = 0;
 
-    /*
-     *  
-     */
     private int maxSizeSample = 0;
 
-    /*
-     *  
-     */
     private long avgSizeSample = 0;
 
     /**
@@ -206,7 +191,7 @@
         lock.setRemoveWaitTimeout(removeWaitTimeout);
     }
 
-    /*
+    /**
      * get Max Queue length
      * 
      * @see org.apache.catalina.cluster.util.IQueue#getMaxQueueLength()
@@ -230,42 +215,42 @@
         }
     }
 
-    /*
+    /**
      * @return Returns the checkLock.
      */
     public boolean isCheckLock() {
         return checkLock;
     }
 
-    /*
+    /**
      * @param checkLock The checkLock to set.
      */
     public void setCheckLock(boolean checkLock) {
         this.checkLock = checkLock;
     }
 
-    /*
+    /**
      * @return Returns the doStats.
      */
     public boolean isDoStats() {
         return doStats;
     }
 
-    /*
+    /**
      * @param doStats The doStats to set.
      */
     public void setDoStats(boolean doStats) {
         this.doStats = doStats;
     }
 
-    /*
+    /**
      * @return Returns the timeWait.
      */
     public boolean isTimeWait() {
         return timeWait;
     }
 
-    /*
+    /**
      * @param timeWait The timeWait to set.
      */
     public void setTimeWait(boolean timeWait) {
@@ -426,7 +411,8 @@
         return sz;
     }
 
-    /* Add new data to the queue
+    /**
+     * Add new data to the queue
      * @see org.apache.catalina.cluster.util.IQueue#add(java.lang.String, java.lang.Object)
      * FIXME extract some method
      */
@@ -436,7 +422,7 @@
 
         if (!enabled) {
             if (log.isInfoEnabled())
-                log.info("FastQueue: queue disabled, add aborted");
+                log.info("FastQueue.add: queue disabled, add aborted");
             return false;
         }
 
@@ -450,7 +436,7 @@
             }
 
             if (log.isTraceEnabled()) {
-                log.trace("FastQueue: add starting with size " + size);
+                log.trace("FastQueue.add: starting with size " + size);
             }
             if (checkLock) {
                 if (inAdd)
@@ -464,7 +450,7 @@
             if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
                 ok = false;
                 if (log.isTraceEnabled()) {
-                    log.trace("FastQueue: Could not add, since queue is full ("
+                    log.trace("FastQueue.add: Could not add, since queue is full ("
                             + size + ">=" + maxQueueLength + ")");
                 }
 
@@ -477,7 +463,7 @@
                     if (last == null) {
                         ok = false;
                         log
-                                .error("FastQueue: Could not add, since last is null although size is "
+                                .error("FastQueue.add: Could not add, since last is null although size is "
                                         + size + " (>0)");
                     } else {
                         last.append(element);
@@ -509,24 +495,24 @@
             }
 
             if (first == null) {
-                log.error("FastQueue: first is null, size is " + size
+                log.error("FastQueue.add: first is null, size is " + size
                         + " at end of add");
             }
             if (last == null) {
-                log.error("FastQueue: last is null, size is " + size
+                log.error("FastQueue.add: last is null, size is " + size
                         + " at end of add");
             }
 
             if (checkLock) {
                 if (!inMutex)
-                    log.warn("FastQueue: Cancelled by other mutex in add");
+                    log.warn("FastQueue.add: Cancelled by other mutex in add");
                 inMutex = false;
                 if (!inAdd)
-                    log.warn("FastQueue: Cancelled by other add");
+                    log.warn("FastQueue.add: Cancelled by other add");
                 inAdd = false;
             }
             if (log.isTraceEnabled()) {
-                log.trace("FastQueue: add ending with size " + size);
+                log.trace("FastQueue.add: add ending with size " + size);
             }
 
             if (timeWait) {
@@ -541,7 +527,8 @@
         return ok;
     }
 
-    /* remove the complete queued object list
+    /**
+     * remove the complete queued object list
      * @see org.apache.catalina.cluster.util.IQueue#remove()
      * FIXME extract some method
      */
@@ -552,7 +539,7 @@
 
         if (!enabled) {
             if (log.isInfoEnabled())
-                log.info("FastQueue: queue disabled, remove aborted");
+                log.info("FastQueue.remove: queue disabled, remove aborted");
             return null;
         }
 
@@ -571,11 +558,10 @@
                         removeErrorCounter++;
                     }
                     if (log.isInfoEnabled())
-                        log
-                                .info("FastQueue: Remove aborted although queue enabled");
+                        log.info("FastQueue.remove: Remove aborted although queue enabled");
                 } else {
                     if (log.isInfoEnabled())
-                        log.info("FastQueue: queue disabled, remove aborted");
+                        log.info("FastQueue.remove: queue disabled, remove aborted");
                 }
                 return null;
             }
@@ -585,14 +571,14 @@
             }
 
             if (log.isTraceEnabled()) {
-                log.trace("FastQueue: remove starting with size " + size);
+                log.trace("FastQueue.remove: remove starting with size " + size);
             }
             if (checkLock) {
                 if (inRemove)
-                    log.warn("FastQueue: Detected other remove");
+                    log.warn("FastQueue.remove: Detected other remove");
                 inRemove = true;
                 if (inMutex)
-                    log.warn("FastQueue: Detected other mutex in remove");
+                    log.warn("FastQueue.remove: Detected other mutex in remove");
                 inMutex = true;
             }
 
@@ -604,7 +590,7 @@
                 } else {
                     removeErrorCounter++;
                     log
-                            .error("FastQueue: Could not remove, since first is null although size is "
+                            .error("FastQueue.remove: Could not remove, since first is null although size is "
                                     + size + " (>0)");
                 }
             }
@@ -614,14 +600,14 @@
 
             if (checkLock) {
                 if (!inMutex)
-                    log.warn("FastQueue: Cancelled by other mutex in remove");
+                    log.warn("FastQueue.remove: Cancelled by other mutex in remove");
                 inMutex = false;
                 if (!inRemove)
-                    log.warn("FastQueue: Cancelled by other remove");
+                    log.warn("FastQueue.remove: Cancelled by other remove");
                 inRemove = false;
             }
             if (log.isTraceEnabled()) {
-                log.trace("FastQueue: remove ending with size " + size);
+                log.trace("FastQueue.remove: remove ending with size " + size);
             }
 
             if (timeWait) {

Modified: tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java (original)
+++ tomcat/container/tc5.5.x/modules/cluster/src/share/org/apache/catalina/cluster/util/SingleRemoveSynchronizedAddLock.java Fri Nov 18 13:07:23 2005
@@ -196,6 +196,7 @@
      */
     public synchronized boolean lockRemove() {
         removeLocked=false;
+        removeEnabled=true;
         if ( ( addLocked || ! dataAvailable ) && removeEnabled ) {
             remover=Thread.currentThread();
             do {

Modified: tomcat/container/tc5.5.x/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/webapps/docs/changelog.xml?rev=345567&r1=345566&r2=345567&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/webapps/docs/changelog.xml (original)
+++ tomcat/container/tc5.5.x/webapps/docs/changelog.xml Fri Nov 18 13:07:23 2005
@@ -145,6 +145,13 @@
   <subsection name="Cluster">
     <changelog>
       <fix>
+        Fix that sendMessage signature at all DataSender subclasses must be changed.
+        Now pooled and async modes working as expected. (pero)
+      </fix>
+      <fix>
+        Fix that socket at o.a.c.cluster.tcp.FastAsyncSocketSender can be disconnect/connect. (pero)
+      </fix>    
+      <fix>
         Fix cluster module build.xml script for new svn repository structure (pero)
       </fix>    
       <fix>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org