You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/15 16:09:29 UTC
svn commit: r1136055 - in
/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire:
OpenwireProtocolHandler.scala codec/OpenWireFormat.java
Author: tabish
Date: Wed Jun 15 14:09:28 2011
New Revision: 1136055
URL: http://svn.apache.org/viewvc?rev=1136055&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30
Add max frame size support.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1136055&r1=1136054&r2=1136055&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Jun 15 14:09:28 2011
@@ -57,7 +57,7 @@ object OpenwireProtocolHandler extends L
preferred_wireformat_settings.setMaxInactivityDuration(30 * 1000 * 1000);
preferred_wireformat_settings.setMaxInactivityDurationInitalDelay(10 * 1000 * 1000);
preferred_wireformat_settings.setCacheSize(1024);
-
+ preferred_wireformat_settings.setMaxFrameSize(OpenWireFormat.DEFAULT_MAX_FRAME_SIZE);
}
@@ -589,8 +589,6 @@ class OpenwireProtocolHandler extends Pr
true
}
}
-
-
}
object ack_handler {
@@ -659,7 +657,6 @@ class OpenwireProtocolHandler extends Pr
all_transactions.remove(id)
}
-
def apply(proc:(StoreUOW)=>Unit) = {
actions += proc
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java?rev=1136055&r1=1136054&r2=1136055&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/OpenWireFormat.java Wed Jun 15 14:09:28 2011
@@ -40,6 +40,7 @@ public final class OpenWireFormat {
public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_VERSION;
public static final String WIREFORMAT_NAME = "openwire";
+ public static final int DEFAULT_MAX_FRAME_SIZE = 100 * 1024 * 1024; //100 MB
static final byte NULL_TYPE = CommandTypes.NULL;
private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
@@ -52,6 +53,7 @@ public final class OpenWireFormat {
private boolean cacheEnabled;
private boolean tightEncodingEnabled;
private boolean sizePrefixDisabled;
+ private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
// The following fields are used for value caching
private short nextMarshallCacheIndex;
@@ -99,7 +101,7 @@ public final class OpenWireFormat {
public String toString() {
return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled=" + tightEncodingEnabled
- + ", sizePrefixDisabled=" + sizePrefixDisabled + "}";
+ + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}";
// return "OpenWireFormat{id="+id+",
// tightEncodingEnabled="+tightEncodingEnabled+"}";
}
@@ -201,6 +203,13 @@ public final class OpenWireFormat {
// throw new IOException("Packet size does not match marshaled
// size");
}
+
+ if (size > maxFrameSize) {
+ throw new IOException(
+ "Frame size of " + (size / (1024 * 1024)) +
+ " MB larger than max allowed " +
+ (maxFrameSize / (1024 * 1024)) + " MB");
+ }
}
Object command = doUnmarshal(bytesIn);
@@ -268,12 +277,13 @@ public final class OpenWireFormat {
public Object unmarshal(DataInput dis) throws IOException {
DataInput dataIn = dis;
if (!sizePrefixDisabled) {
- dis.readInt();
- // int size = dis.readInt();
- // byte[] data = new byte[size];
- // dis.readFully(data);
- // bytesIn.restart(data);
- // dataIn = bytesIn;
+ int size = dis.readInt();
+ if (size > maxFrameSize) {
+ throw new IOException(
+ "Frame size of " + (size / (1024 * 1024)) +
+ " MB larger than max allowed " +
+ (maxFrameSize / (1024 * 1024)) + " MB");
+ }
}
return doUnmarshal(dataIn);
}
@@ -576,6 +586,13 @@ public final class OpenWireFormat {
this.sizePrefixDisabled = prefixPacketSize;
}
+ public long getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ public void setMaxFrameSize(long maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ }
public void renegotiateWireFormat(WireFormatInfo info, WireFormatInfo preferedWireFormatInfo) throws IOException {
@@ -586,6 +603,9 @@ public final class OpenWireFormat {
this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
info.setVersion(this.getVersion());
+ this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
+ info.setMaxFrameSize(this.getMaxFrameSize());
+
this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
info.setStackTraceEnabled(this.stackTraceEnabled);
@@ -631,5 +651,11 @@ public final class OpenWireFormat {
}
return version2;
}
-
+
+ protected long min(long version1, long version2) {
+ if (version1 < version2 && version1 > 0 || version2 <= 0) {
+ return version1;
+ }
+ return version2;
+ }
}