You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/06/09 17:09:55 UTC

svn commit: r1133915 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/command/ main/java/org/apache/activemq/openwire/ main/java/org/apache/activemq/transport/nio/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/a...

Author: dejanb
Date: Thu Jun  9 15:09:55 2011
New Revision: 1133915

URL: http://svn.apache.org/viewvc?rev=1133915&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-498 - make max frame size part of wire format negotiation

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
    activemq/trunk/activemq-core/src/test/resources/activemq.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Thu Jun  9 15:09:55 2011
@@ -270,6 +270,15 @@ public class WireFormatInfo implements C
     public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
         setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
     }
+
+    public long getMaxFrameSize() throws IOException {
+        Long l = (Long)getProperty("MaxFrameSize");
+        return l == null ? 0 : l.longValue();
+    }
+
+    public void setMaxFrameSize(long maxFrameSize) throws IOException {
+        setProperty("MaxFrameSize", new Long(maxFrameSize));
+    }
     
    
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Thu Jun  9 15:09:55 2011
@@ -40,6 +40,7 @@ public final class OpenWireFormat implem
 
     public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
     public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
+    public static final int DEFAULT_MAX_FRAME_SIZE = 100 * 1024 * 1024; //100 MB
 
     static final byte NULL_TYPE = CommandTypes.NULL;
     private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
@@ -52,6 +53,7 @@ public final class OpenWireFormat implem
     private boolean cacheEnabled;
     private boolean tightEncodingEnabled;
     private boolean sizePrefixDisabled;
+    private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
 
     // The following fields are used for value caching
     private short nextMarshallCacheIndex;
@@ -103,7 +105,7 @@ public final class OpenWireFormat implem
 
     public String toString() {
         return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
-               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+               + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled +  ", maxFrameSize=" + maxFrameSize + "}";
         // return "OpenWireFormat{id="+id+",
         // tightEncodingEnabled="+tightEncodingEnabled+"}";
     }
@@ -591,6 +593,14 @@ public final class OpenWireFormat implem
         return preferedWireFormatInfo;
     }
 
+    public long getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public void setMaxFrameSize(long maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
+
     public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
 
         if (preferedWireFormatInfo == null) {
@@ -600,6 +610,9 @@ public final class OpenWireFormat implem
         this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
         info.setVersion(this.getVersion());
 
+        this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
+        info.setMaxFrameSize(this.getMaxFrameSize());
+
         this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
         info.setStackTraceEnabled(this.stackTraceEnabled);
 
@@ -647,4 +660,11 @@ public final class OpenWireFormat implem
         }
         return version2;
     }
+
+    protected long min(long version1, long version2) {
+        if (version1 < version2 && version1 > 0 || version2 <= 0) {
+            return version1;
+        }
+        return version2;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Thu Jun  9 15:09:55 2011
@@ -39,6 +39,7 @@ public class OpenWireFormatFactory imple
     private long maxInactivityDuration = 30*1000;
     private long maxInactivityDurationInitalDelay = 10*1000;
     private int cacheSize = 1024;
+    private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE;
 
     public WireFormat createWireFormat() {
         WireFormatInfo info = new WireFormatInfo();
@@ -53,6 +54,7 @@ public class OpenWireFormatFactory imple
             info.setMaxInactivityDuration(maxInactivityDuration);
             info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
             info.setCacheSize(cacheSize);
+            info.setMaxFrameSize(maxFrameSize);
         } catch (Exception e) {
             IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
             ise.initCause(e);
@@ -60,6 +62,7 @@ public class OpenWireFormatFactory imple
         }
 
         OpenWireFormat f = new OpenWireFormat(version);
+        f.setMaxFrameSize(maxFrameSize);
         f.setPreferedWireFormatInfo(info);
         return f;
     }
@@ -136,4 +139,12 @@ public class OpenWireFormatFactory imple
             long maxInactivityDurationInitalDelay) {
         this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
     }
+
+    public long getMaxFrameSize() {
+        return maxFrameSize;
+    }
+
+    public void setMaxFrameSize(long maxFrameSize) {
+        this.maxFrameSize = maxFrameSize;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Thu Jun  9 15:09:55 2011
@@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel;
 import javax.net.SocketFactory;
 
 import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -115,9 +116,14 @@ public class NIOTransport extends TcpTra
                     // for it.
                     inputBuffer.flip();
                     nextFrameSize = inputBuffer.getInt() + 4;
-                    if (nextFrameSize > maxFrameSize) {
-                        throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+
+                    if (wireFormat instanceof OpenWireFormat) {
+                        long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
+                        if (nextFrameSize > maxFrameSize) {
+                            throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+                        }
                     }
+
                     if (nextFrameSize > inputBuffer.capacity()) {
                         currentBuffer = ByteBuffer.allocate(nextFrameSize);
                         currentBuffer.putInt(nextFrameSize);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Jun  9 15:09:55 2011
@@ -70,8 +70,6 @@ public class TcpTransport extends Transp
     protected DataInputStream dataIn;
     protected TimeStampStream buffOut = null;
 
-    protected int maxFrameSize = 104857600; //100MB
-
 
     /**
      * The Traffic Class to be set on the socket.
@@ -323,14 +321,6 @@ public class TcpTransport extends Transp
         return socketBufferSize;
     }
 
-    public int getMaxFrameSize() {
-        return maxFrameSize;
-    }
-
-    public void setMaxFrameSize(int maxFrameSize) {
-        this.maxFrameSize = maxFrameSize;
-    }
-
     /**
      * Sets the buffer size to use on the socket
      */

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java Thu Jun  9 15:09:55 2011
@@ -152,7 +152,7 @@ public class WireformatNegociationTest e
     /**
      * @throws Exception
      */
-    public void testWireFomatInfoSeverVersion1() throws Exception {
+    public void testWireFormatInfoSeverVersion1() throws Exception {
 
         startServer("tcp://localhost:61616?wireFormat.version=1");
         startClient("tcp://localhost:61616");
@@ -170,7 +170,7 @@ public class WireformatNegociationTest e
     /**
      * @throws Exception
      */
-    public void testWireFomatInfoClientVersion1() throws Exception {
+    public void testWireFormatInfoClientVersion1() throws Exception {
 
         startServer("tcp://localhost:61616");
         startClient("tcp://localhost:61616?wireFormat.version=1");
@@ -188,7 +188,7 @@ public class WireformatNegociationTest e
     /**
      * @throws Exception
      */
-    public void testWireFomatInfoCurrentVersion() throws Exception {
+    public void testWireFormatInfoCurrentVersion() throws Exception {
 
         startServer("tcp://localhost:61616");
         startClient("tcp://localhost:61616");
@@ -203,7 +203,7 @@ public class WireformatNegociationTest e
         assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
     }
     
-    public void testWireFomatInactivityDurationInitalDelay() throws Exception {
+    public void testWireFormatInactivityDurationInitialDelay() throws Exception {
 
         startServer("tcp://localhost:61616");
         startClient("tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=60000");
@@ -218,4 +218,19 @@ public class WireformatNegociationTest e
         assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
     }
 
+    public void testWireFormatMaxFrameSize() throws Exception {
+
+        startServer("tcp://localhost:61616");
+        startClient("tcp://localhost:61616?wireFormat.maxFrameSize=1048576");
+
+        assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
+        assertNull("Async error: " + asyncError, asyncError.get());
+
+        assertNotNull(clientWF.get());
+        assertEquals(1048576, clientWF.get().getMaxFrameSize());
+
+        assertNotNull(serverWF.get());
+        assertEquals(1048576, serverWF.get().getMaxFrameSize());
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/resources/activemq.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/activemq.xml?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/activemq.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/activemq.xml Thu Jun  9 15:09:55 2011
@@ -28,7 +28,7 @@
   <broker useJmx="false"  xmlns="http://activemq.apache.org/schema/core" persistent="false">
 
     <transportConnectors>
-      <transportConnector uri="nio://localhost:61616?transport.maxFrameSize=10485760" />
+      <transportConnector uri="nio://localhost:61616?wireFormat.maxFrameSize=1048576" />
     </transportConnectors>
         
   </broker>