You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2008/11/15 19:13:09 UTC

svn commit: r717903 - in /tomcat/tc6.0.x/trunk: ./ STATUS.txt java/org/apache/catalina/ha/deploy/FileMessageFactory.java webapps/docs/changelog.xml

Author: markt
Date: Sat Nov 15 10:13:08 2008
New Revision: 717903

URL: http://svn.apache.org/viewvc?rev=717903&view=rev
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
Fix out of order message processing issues

Modified:
    tomcat/tc6.0.x/trunk/   (props changed)
    tomcat/tc6.0.x/trunk/STATUS.txt
    tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java
    tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml

Propchange: tomcat/tc6.0.x/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 15 10:13:08 2008
@@ -1 +1 @@
-/tomcat/trunk:673796,673820,683982,684001,684081,684234,684269-684270,687503,687645,690781,691392,691805,692748,695053,695311,696780,696782,698012,698227,698236,698613,711126
+/tomcat/trunk:673796,673820,683982,684001,684081,684234,684269-684270,687503,687645,690781,691392,691805,692748,695053,695311,696780,696782,698012,698227,698236,698613,699427,711126

Modified: tomcat/tc6.0.x/trunk/STATUS.txt
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/STATUS.txt?rev=717903&r1=717902&r2=717903&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/STATUS.txt (original)
+++ tomcat/tc6.0.x/trunk/STATUS.txt Sat Nov 15 10:13:08 2008
@@ -114,13 +114,6 @@
   +1: rjung, mturk, markt, pero
    0: remm (also affects to the two other AJP connectors)
 
-* Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
-  Fix NPE and out of order message processing issues
-  http://svn.apache.org/viewvc?rev=699427&view=rev
-  +1: markt, fhanik, pero
-   0: 
-  -1: 
-
 * Fix serialisation issue reported by Find Bugs
   http://svn.apache.org/viewvc?rev=699633&view=rev
   +1: markt
@@ -248,7 +241,3 @@
   +1: markt, fhanik
   -1: 
 
-* Don't swallow input if we know the connection is going to be closed
-  http://svn.apache.org/viewvc?rev=714214&view=rev
-  +1: billbarker
-  -1:

Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java?rev=717903&r1=717902&r2=717903&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java (original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/ha/deploy/FileMessageFactory.java Sat Nov 15 10:13:08 2008
@@ -22,6 +22,9 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileNotFoundException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * This factory is used to read files and write files by splitting them up into
@@ -74,7 +77,7 @@
     protected FileOutputStream out;
 
     /**
-     * The number of messages we have read or written
+     * The number of messages we have written
      */
     protected int nrOfMessagesProcessed = 0;
 
@@ -87,6 +90,19 @@
      * The total number of packets that we split this file into
      */
     protected long totalNrOfMessages = 0;
+    
+    /**
+     * The number of the last message procssed. Message IDs are 1 based.
+     */
+    protected AtomicLong lastMessageProcessed = new AtomicLong(0);
+    
+    /**
+     * Messages received out of order are held in the buffer until required. If
+     * everything is worked as expected, messages will spend very little time in
+     * the buffer.
+     */
+    protected Map<Long, FileMessage> msgBuffer =
+        new ConcurrentHashMap<Long, FileMessage>();
 
     /**
      * The bytes that we hold the data in, not thread safe.
@@ -94,6 +110,12 @@
     protected byte[] data = new byte[READ_SIZE];
 
     /**
+     * Flag that indicates if a thread is writing messages to disk. Access to
+     * this flag must be synchronised.
+     */
+    protected boolean isWriting = false;
+
+    /**
      * Private constructor, either instantiates a factory to read or write. <BR>
      * When openForWrite==true, then a the file, f, will be created and an
      * output stream is opened to write to it. <BR>
@@ -205,25 +227,65 @@
         if (log.isDebugEnabled())
             log.debug("Message " + msg + " data " + msg.getData()
                     + " data length " + msg.getDataLength() + " out " + out);
-        if (out != null) {
-            out.write(msg.getData(), 0, msg.getDataLength());
-            nrOfMessagesProcessed++;
+        
+        if (msg.getMessageNumber() <= lastMessageProcessed.get()) {
+            // Duplicate of message already processed
+            log.warn("Receive Message again -- Sender ActTimeout too short [ path: "
+                    + msg.getContextPath()
+                    + " war: "
+                    + msg.getFileName()
+                    + " data: "
+                    + msg.getData()
+                    + " data length: " + msg.getDataLength() + " ]");
+            return false;
+        }
+        
+        FileMessage previous =
+            msgBuffer.put(new Long(msg.getMessageNumber()), msg);
+        if (previous !=null) {
+            // Duplicate of message not yet processed
+            log.warn("Receive Message again -- Sender ActTimeout too short [ path: "
+                    + msg.getContextPath()
+                    + " war: "
+                    + msg.getFileName()
+                    + " data: "
+                    + msg.getData()
+                    + " data length: " + msg.getDataLength() + " ]");
+            return false;
+        }
+        
+        FileMessage next = null;
+        synchronized (this) {
+            if (!isWriting) {
+                next = msgBuffer.get(new Long(lastMessageProcessed.get() + 1));
+                if (next != null) {
+                    isWriting = true;
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+        }
+        
+        while (next != null) {
+            out.write(next.getData(), 0, next.getDataLength());
+            lastMessageProcessed.incrementAndGet();
             out.flush();
-            if (msg.getMessageNumber() == msg.getTotalNrOfMsgs()) {
+            if (next.getMessageNumber() == next.getTotalNrOfMsgs()) {
                 out.close();
                 cleanup();
                 return true;
-            }//end if
-        } else {
-            if (log.isWarnEnabled())
-                log.warn("Receive Message again -- Sender ActTimeout to short [ path: "
-                                + msg.getContextPath()
-                                + " war: "
-                                + msg.getFileName()
-                                + " data: "
-                                + msg.getData()
-                                + " data length: " + msg.getDataLength() + " ]");
+            }
+            synchronized(this) {
+                next =
+                    msgBuffer.get(new Long(lastMessageProcessed.get() + 1));
+                if (next == null) {
+                    isWriting = false;
+                }
+            }
         }
+        
         return false;
     }//writeMessage
 
@@ -248,6 +310,8 @@
         data = null;
         nrOfMessagesProcessed = 0;
         totalNrOfMessages = 0;
+        msgBuffer.clear();
+        lastMessageProcessed = null;
     }
 
     /**
@@ -309,4 +373,4 @@
         return file;
     }
 
-}
\ No newline at end of file
+}

Modified: tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml?rev=717903&r1=717902&r2=717903&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc6.0.x/trunk/webapps/docs/changelog.xml Sat Nov 15 10:13:08 2008
@@ -249,6 +249,10 @@
         <bug>45618</bug>: Make sure NIO selector is closed when no longer used.
         Unlikely to be an issue in normal usage. (markt)
       </fix>
+      <fix>
+        <bug>45851</bug>: Fix out of order message processing issues with the
+        FarmWarDeployer. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Webapps">



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r717903 - in /tomcat/tc6.0.x/trunk: ./ STATUS.txt java/org/apache/catalina/ha/deploy/FileMessageFactory.java webapps/docs/changelog.xml

Posted by Mark Thomas <ma...@apache.org>.
Konstantin Kolinko wrote:
> This removed Bill Barker's proposal ("Don't swallow input ...") from STATUS.txt
> 
> It looks like it was not intentional.

It wasn't. Not sure why my update didn't catch that change. I'll restore it.

Thanks for the heads up.

Mark

> 
> Best regards,
> Konstantin Kolinko
> 
> 
> 2008/11/15  <ma...@apache.org>:
>> Author: markt
>> Date: Sat Nov 15 10:13:08 2008
>> New Revision: 717903
>>
>> URL: http://svn.apache.org/viewvc?rev=717903&view=rev
>> Log:
>> Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
>> Fix out of order message processing issues
>>
>> (...)
>> ==============================================================================
>> --- tomcat/tc6.0.x/trunk/STATUS.txt (original)
>> +++ tomcat/tc6.0.x/trunk/STATUS.txt Sat Nov 15 10:13:08 2008
>> @@ -114,13 +114,6 @@
>>   +1: rjung, mturk, markt, pero
>>    0: remm (also affects to the two other AJP connectors)
>>
>> -* Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
>> -  Fix NPE and out of order message processing issues
>> -  http://svn.apache.org/viewvc?rev=699427&view=rev
>> -  +1: markt, fhanik, pero
>> -   0:
>> -  -1:
>> -
>>  * Fix serialisation issue reported by Find Bugs
>>   http://svn.apache.org/viewvc?rev=699633&view=rev
>>   +1: markt
>> @@ -248,7 +241,3 @@
>>   +1: markt, fhanik
>>   -1:
>>
>> -* Don't swallow input if we know the connection is going to be closed
>> -  http://svn.apache.org/viewvc?rev=714214&view=rev
>> -  +1: billbarker
>> -  -1:
>>
>> (...)
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
> For additional commands, e-mail: dev-help@tomcat.apache.org
> 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org


Re: svn commit: r717903 - in /tomcat/tc6.0.x/trunk: ./ STATUS.txt java/org/apache/catalina/ha/deploy/FileMessageFactory.java webapps/docs/changelog.xml

Posted by Konstantin Kolinko <kn...@gmail.com>.
This removed Bill Barker's proposal ("Don't swallow input ...") from STATUS.txt

It looks like it was not intentional.

Best regards,
Konstantin Kolinko


2008/11/15  <ma...@apache.org>:
> Author: markt
> Date: Sat Nov 15 10:13:08 2008
> New Revision: 717903
>
> URL: http://svn.apache.org/viewvc?rev=717903&view=rev
> Log:
> Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
> Fix out of order message processing issues
>
> (...)
> ==============================================================================
> --- tomcat/tc6.0.x/trunk/STATUS.txt (original)
> +++ tomcat/tc6.0.x/trunk/STATUS.txt Sat Nov 15 10:13:08 2008
> @@ -114,13 +114,6 @@
>   +1: rjung, mturk, markt, pero
>    0: remm (also affects to the two other AJP connectors)
>
> -* Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=45851
> -  Fix NPE and out of order message processing issues
> -  http://svn.apache.org/viewvc?rev=699427&view=rev
> -  +1: markt, fhanik, pero
> -   0:
> -  -1:
> -
>  * Fix serialisation issue reported by Find Bugs
>   http://svn.apache.org/viewvc?rev=699633&view=rev
>   +1: markt
> @@ -248,7 +241,3 @@
>   +1: markt, fhanik
>   -1:
>
> -* Don't swallow input if we know the connection is going to be closed
> -  http://svn.apache.org/viewvc?rev=714214&view=rev
> -  +1: billbarker
> -  -1:
>
> (...)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org