You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/06/09 17:09:55 UTC
svn commit: r1133915 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/command/
main/java/org/apache/activemq/openwire/
main/java/org/apache/activemq/transport/nio/
main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/a...
Author: dejanb
Date: Thu Jun 9 15:09:55 2011
New Revision: 1133915
URL: http://svn.apache.org/viewvc?rev=1133915&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-498 - make max frame size part of wire format negotiation
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
activemq/trunk/activemq-core/src/test/resources/activemq.xml
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Thu Jun 9 15:09:55 2011
@@ -270,6 +270,15 @@ public class WireFormatInfo implements C
public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) throws IOException {
setProperty("MaxInactivityDurationInitalDelay", new Long(maxInactivityDurationInitalDelay));
}
+
+ public long getMaxFrameSize() throws IOException {
+ Long l = (Long)getProperty("MaxFrameSize");
+ return l == null ? 0 : l.longValue();
+ }
+
+ public void setMaxFrameSize(long maxFrameSize) throws IOException {
+ setProperty("MaxFrameSize", new Long(maxFrameSize));
+ }
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Thu Jun 9 15:09:55 2011
@@ -40,6 +40,7 @@ public final class OpenWireFormat implem
public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
+ public static final int DEFAULT_MAX_FRAME_SIZE = 100 * 1024 * 1024; //100 MB
static final byte NULL_TYPE = CommandTypes.NULL;
private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
@@ -52,6 +53,7 @@ public final class OpenWireFormat implem
private boolean cacheEnabled;
private boolean tightEncodingEnabled;
private boolean sizePrefixDisabled;
+ private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
// The following fields are used for value caching
private short nextMarshallCacheIndex;
@@ -103,7 +105,7 @@ public final class OpenWireFormat implem
public String toString() {
return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
- + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+ + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}";
// return "OpenWireFormat{id="+id+",
// tightEncodingEnabled="+tightEncodingEnabled+"}";
}
@@ -591,6 +593,14 @@ public final class OpenWireFormat implem
return preferedWireFormatInfo;
}
+ public long getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(long maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
+
public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
if (preferedWireFormatInfo == null) {
@@ -600,6 +610,9 @@ public final class OpenWireFormat implem
this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
info.setVersion(this.getVersion());
+ this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
+ info.setMaxFrameSize(this.getMaxFrameSize());
+
this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
info.setStackTraceEnabled(this.stackTraceEnabled);
@@ -647,4 +660,11 @@ public final class OpenWireFormat implem
}
return version2;
}
+
+ protected long min(long version1, long version2) {
+ if (version1 < version2 && version1 > 0 || version2 <= 0) {
+ return version1;
+ }
+ return version2;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormatFactory.java Thu Jun 9 15:09:55 2011
@@ -39,6 +39,7 @@ public class OpenWireFormatFactory imple
private long maxInactivityDuration = 30*1000;
private long maxInactivityDurationInitalDelay = 10*1000;
private int cacheSize = 1024;
+ private long maxFrameSize = OpenWireFormat.DEFAULT_MAX_FRAME_SIZE;
public WireFormat createWireFormat() {
WireFormatInfo info = new WireFormatInfo();
@@ -53,6 +54,7 @@ public class OpenWireFormatFactory imple
info.setMaxInactivityDuration(maxInactivityDuration);
info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
info.setCacheSize(cacheSize);
+ info.setMaxFrameSize(maxFrameSize);
} catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
ise.initCause(e);
@@ -60,6 +62,7 @@ public class OpenWireFormatFactory imple
}
OpenWireFormat f = new OpenWireFormat(version);
+ f.setMaxFrameSize(maxFrameSize);
f.setPreferedWireFormatInfo(info);
return f;
}
@@ -136,4 +139,12 @@ public class OpenWireFormatFactory imple
long maxInactivityDurationInitalDelay) {
this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
}
+
+ public long getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(long maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Thu Jun 9 15:09:55 2011
@@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel;
import javax.net.SocketFactory;
import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.util.IOExceptionSupport;
@@ -115,9 +116,14 @@ public class NIOTransport extends TcpTra
// for it.
inputBuffer.flip();
nextFrameSize = inputBuffer.getInt() + 4;
- if (nextFrameSize > maxFrameSize) {
- throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+
+ if (wireFormat instanceof OpenWireFormat) {
+ long maxFrameSize = ((OpenWireFormat)wireFormat).getMaxFrameSize();
+ if (nextFrameSize > maxFrameSize) {
+ throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
+ }
}
+
if (nextFrameSize > inputBuffer.capacity()) {
currentBuffer = ByteBuffer.allocate(nextFrameSize);
currentBuffer.putInt(nextFrameSize);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Jun 9 15:09:55 2011
@@ -70,8 +70,6 @@ public class TcpTransport extends Transp
protected DataInputStream dataIn;
protected TimeStampStream buffOut = null;
- protected int maxFrameSize = 104857600; //100MB
-
/**
* The Traffic Class to be set on the socket.
@@ -323,14 +321,6 @@ public class TcpTransport extends Transp
return socketBufferSize;
}
- public int getMaxFrameSize() {
- return maxFrameSize;
- }
-
- public void setMaxFrameSize(int maxFrameSize) {
- this.maxFrameSize = maxFrameSize;
- }
-
/**
* Sets the buffer size to use on the socket
*/
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/WireformatNegociationTest.java Thu Jun 9 15:09:55 2011
@@ -152,7 +152,7 @@ public class WireformatNegociationTest e
/**
* @throws Exception
*/
- public void testWireFomatInfoSeverVersion1() throws Exception {
+ public void testWireFormatInfoSeverVersion1() throws Exception {
startServer("tcp://localhost:61616?wireFormat.version=1");
startClient("tcp://localhost:61616");
@@ -170,7 +170,7 @@ public class WireformatNegociationTest e
/**
* @throws Exception
*/
- public void testWireFomatInfoClientVersion1() throws Exception {
+ public void testWireFormatInfoClientVersion1() throws Exception {
startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616?wireFormat.version=1");
@@ -188,7 +188,7 @@ public class WireformatNegociationTest e
/**
* @throws Exception
*/
- public void testWireFomatInfoCurrentVersion() throws Exception {
+ public void testWireFormatInfoCurrentVersion() throws Exception {
startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616");
@@ -203,7 +203,7 @@ public class WireformatNegociationTest e
assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
}
- public void testWireFomatInactivityDurationInitalDelay() throws Exception {
+ public void testWireFormatInactivityDurationInitialDelay() throws Exception {
startServer("tcp://localhost:61616");
startClient("tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=60000");
@@ -218,4 +218,19 @@ public class WireformatNegociationTest e
assertEquals(CommandTypes.PROTOCOL_VERSION, serverWF.get().getVersion());
}
+ public void testWireFormatMaxFrameSize() throws Exception {
+
+ startServer("tcp://localhost:61616");
+ startClient("tcp://localhost:61616?wireFormat.maxFrameSize=1048576");
+
+ assertTrue("Connect timeout", negociationCounter.await(10, TimeUnit.SECONDS));
+ assertNull("Async error: " + asyncError, asyncError.get());
+
+ assertNotNull(clientWF.get());
+ assertEquals(1048576, clientWF.get().getMaxFrameSize());
+
+ assertNotNull(serverWF.get());
+ assertEquals(1048576, serverWF.get().getMaxFrameSize());
+ }
+
}
Modified: activemq/trunk/activemq-core/src/test/resources/activemq.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/activemq.xml?rev=1133915&r1=1133914&r2=1133915&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/activemq.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/activemq.xml Thu Jun 9 15:09:55 2011
@@ -28,7 +28,7 @@
<broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="false">
<transportConnectors>
- <transportConnector uri="nio://localhost:61616?transport.maxFrameSize=10485760" />
+ <transportConnector uri="nio://localhost:61616?wireFormat.maxFrameSize=1048576" />
</transportConnectors>
</broker>