You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/06/25 12:46:14 UTC
svn commit: r550449 [1/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/command/ main/java/org/apache/a...
Author: rajdavies
Date: Mon Jun 25 03:45:55 2007
New Revision: 550449
URL: http://svn.apache.org/viewvc?view=rev&rev=550449
Log:
Applying patch from http://issues.apache.org/activemq/browse/AMQ-1293
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/PropertiesBrokerFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ComparisonExpression.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ConstantExpression.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/UnaryExpression.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BaseDataStreamMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryIntPropertyEditor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java
activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSExclusiveConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSUsecaseTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeStreamletTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LoadTestBurnIn.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQBytesMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQStreamMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageAckTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreQueueTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyConnectorTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/SimpleSecurityBrokerSystemTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/message/NestedMapMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/retroactive/DummyMessageQuery.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/fanout/FanoutTransportBrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/peer/PeerTransportTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/PropertiesBrokerFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/PropertiesBrokerFactory.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/PropertiesBrokerFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/PropertiesBrokerFactory.java Mon Jun 25 03:45:55 2007
@@ -76,6 +76,7 @@
}
}
properties.load(inputStream);
+ inputStream.close();
// should we append any system properties?
try {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagementContext.java Mon Jun 25 03:45:55 2007
@@ -387,7 +387,7 @@
mbeanServer.registerMBean(cl.newInstance(),namingServiceObjectName);
// mbeanServer.createMBean("mx4j.tools.naming.NamingService", namingServiceObjectName, null);
// set the naming port
- Attribute attr=new Attribute("Port",new Integer(connectorPort));
+ Attribute attr=new Attribute("Port",Integer.valueOf(connectorPort));
mbeanServer.setAttribute(namingServiceObjectName,attr);
}catch(Throwable e){
log.debug("Failed to create local registry",e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Mon Jun 25 03:45:55 2007
@@ -124,9 +124,9 @@
rc.put("JMSReplyTo", ""+m.getJMSReplyTo());
rc.put("JMSType", m.getJMSType());
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode()==DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
- rc.put("JMSExpiration", new Long(m.getJMSExpiration()));
- rc.put("JMSPriority", new Integer(m.getJMSPriority()));
- rc.put("JMSRedelivered", new Boolean(m.getJMSRedelivered()));
+ rc.put("JMSExpiration", Long.valueOf(m.getJMSExpiration()));
+ rc.put("JMSPriority", Integer.valueOf(m.getJMSPriority()));
+ rc.put("JMSRedelivered", Boolean.valueOf(m.getJMSRedelivered()));
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
try {
rc.put("Properties", ""+m.getProperties());
@@ -155,9 +155,9 @@
long length=0;
try {
length = m.getBodyLength();
- rc.put("BodyLength", new Long(length));
+ rc.put("BodyLength", Long.valueOf(length));
} catch (JMSException e) {
- rc.put("BodyLength", new Long(0));
+ rc.put("BodyLength", Long.valueOf(0));
}
try {
byte preview[] = new byte[ (int)Math.min(length, 255) ];
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Jun 25 03:45:55 2007
@@ -119,9 +119,9 @@
MessageReference node=(MessageReference)iter.next();
Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
if(count!=null){
- redeliveredMessages.put(node.getMessageId(),new Integer(count.intValue()+1));
+ redeliveredMessages.put(node.getMessageId(),Integer.valueOf(count.intValue()+1));
}else{
- redeliveredMessages.put(node.getMessageId(),new Integer(1));
+ redeliveredMessages.put(node.getMessageId(),Integer.valueOf(1));
}
if(keepDurableSubsActive){
synchronized(pending){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java Mon Jun 25 03:45:55 2007
@@ -477,7 +477,7 @@
*/
public void setByte(String name, byte value) throws JMSException {
initializeWriting();
- put(name, new Byte(value));
+ put(name, Byte.valueOf(value));
}
/**
@@ -491,7 +491,7 @@
*/
public void setShort(String name, short value) throws JMSException {
initializeWriting();
- put(name, new Short(value));
+ put(name, Short.valueOf(value));
}
/**
@@ -505,7 +505,7 @@
*/
public void setChar(String name, char value) throws JMSException {
initializeWriting();
- put(name, new Character(value));
+ put(name, Character.valueOf(value));
}
/**
@@ -519,7 +519,7 @@
*/
public void setInt(String name, int value) throws JMSException {
initializeWriting();
- put(name, new Integer(value));
+ put(name, Integer.valueOf(value));
}
/**
@@ -533,7 +533,7 @@
*/
public void setLong(String name, long value) throws JMSException {
initializeWriting();
- put(name, new Long(value));
+ put(name, Long.valueOf(value));
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Mon Jun 25 03:45:55 2007
@@ -53,7 +53,6 @@
}
transient protected Callback acknowledgeCallback;
- transient int hashCode;
public Message copy() {
ActiveMQMessage copy = new ActiveMQMessage();
@@ -545,23 +544,23 @@
setBooleanProperty(name,value,true);
}
public void setBooleanProperty(String name, boolean value,boolean checkReadOnly) throws JMSException {
- setObjectProperty(name, value ? Boolean.TRUE : Boolean.FALSE,checkReadOnly);
+ setObjectProperty(name, Boolean.valueOf(value), checkReadOnly);
}
public void setByteProperty(String name, byte value) throws JMSException {
- setObjectProperty(name, new Byte(value));
+ setObjectProperty(name, Byte.valueOf(value));
}
public void setShortProperty(String name, short value) throws JMSException {
- setObjectProperty(name, new Short(value));
+ setObjectProperty(name, Short.valueOf(value));
}
public void setIntProperty(String name, int value) throws JMSException {
- setObjectProperty(name, new Integer(value));
+ setObjectProperty(name, Integer.valueOf(value));
}
public void setLongProperty(String name, long value) throws JMSException {
- setObjectProperty(name, new Long(value));
+ setObjectProperty(name, Long.valueOf(value));
}
public void setFloatProperty(String name, float value) throws JMSException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Mon Jun 25 03:45:55 2007
@@ -819,16 +819,16 @@
return this.dataIn.readUTF();
}
if (type == MarshallingSupport.LONG_TYPE) {
- return new Long(this.dataIn.readLong());
+ return Long.valueOf(this.dataIn.readLong());
}
if (type == MarshallingSupport.INTEGER_TYPE) {
- return new Integer(this.dataIn.readInt());
+ return Integer.valueOf(this.dataIn.readInt());
}
if (type == MarshallingSupport.SHORT_TYPE) {
- return new Short(this.dataIn.readShort());
+ return Short.valueOf(this.dataIn.readShort());
}
if (type == MarshallingSupport.BYTE_TYPE) {
- return new Byte(this.dataIn.readByte());
+ return Byte.valueOf(this.dataIn.readByte());
}
if (type == MarshallingSupport.FLOAT_TYPE) {
return new Float(this.dataIn.readFloat());
@@ -840,7 +840,7 @@
return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
}
if (type == MarshallingSupport.CHAR_TYPE) {
- return new Character(this.dataIn.readChar());
+ return Character.valueOf(this.dataIn.readChar());
}
if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
int len = this.dataIn.readInt();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ComparisonExpression.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ComparisonExpression.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ComparisonExpression.java Mon Jun 25 03:45:55 2007
@@ -41,26 +41,26 @@
static final private HashSet REGEXP_CONTROL_CHARS = new HashSet();
static {
- REGEXP_CONTROL_CHARS.add(new Character('.'));
- REGEXP_CONTROL_CHARS.add(new Character('\\'));
- REGEXP_CONTROL_CHARS.add(new Character('['));
- REGEXP_CONTROL_CHARS.add(new Character(']'));
- REGEXP_CONTROL_CHARS.add(new Character('^'));
- REGEXP_CONTROL_CHARS.add(new Character('$'));
- REGEXP_CONTROL_CHARS.add(new Character('?'));
- REGEXP_CONTROL_CHARS.add(new Character('*'));
- REGEXP_CONTROL_CHARS.add(new Character('+'));
- REGEXP_CONTROL_CHARS.add(new Character('{'));
- REGEXP_CONTROL_CHARS.add(new Character('}'));
- REGEXP_CONTROL_CHARS.add(new Character('|'));
- REGEXP_CONTROL_CHARS.add(new Character('('));
- REGEXP_CONTROL_CHARS.add(new Character(')'));
- REGEXP_CONTROL_CHARS.add(new Character(':'));
- REGEXP_CONTROL_CHARS.add(new Character('&'));
- REGEXP_CONTROL_CHARS.add(new Character('<'));
- REGEXP_CONTROL_CHARS.add(new Character('>'));
- REGEXP_CONTROL_CHARS.add(new Character('='));
- REGEXP_CONTROL_CHARS.add(new Character('!'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('.'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('\\'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('['));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf(']'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('^'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('$'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('?'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('*'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('+'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('{'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('}'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('|'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('('));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf(')'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf(':'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('&'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('<'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('>'));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('='));
+ REGEXP_CONTROL_CHARS.add(Character.valueOf('!'));
}
static class LikeExpression extends UnaryExpression implements BooleanExpression {
@@ -354,13 +354,13 @@
if (lc != rc) {
if (lc == Byte.class) {
if (rc == Short.class) {
- lv = new Short(((Number) lv).shortValue());
+ lv = Short.valueOf(((Number) lv).shortValue());
}
else if (rc == Integer.class) {
- lv = new Integer(((Number) lv).intValue());
+ lv = Integer.valueOf(((Number) lv).intValue());
}
else if (rc == Long.class) {
- lv = new Long(((Number) lv).longValue());
+ lv = Long.valueOf(((Number) lv).longValue());
}
else if (rc == Float.class) {
lv = new Float(((Number) lv).floatValue());
@@ -373,10 +373,10 @@
}
} else if (lc == Short.class) {
if (rc == Integer.class) {
- lv = new Integer(((Number) lv).intValue());
+ lv = Integer.valueOf(((Number) lv).intValue());
}
else if (rc == Long.class) {
- lv = new Long(((Number) lv).longValue());
+ lv = Long.valueOf(((Number) lv).longValue());
}
else if (rc == Float.class) {
lv = new Float(((Number) lv).floatValue());
@@ -389,7 +389,7 @@
}
} else if (lc == Integer.class) {
if (rc == Long.class) {
- lv = new Long(((Number) lv).longValue());
+ lv = Long.valueOf(((Number) lv).longValue());
}
else if (rc == Float.class) {
lv = new Float(((Number) lv).floatValue());
@@ -403,7 +403,7 @@
}
else if (lc == Long.class) {
if (rc == Integer.class) {
- rv = new Long(((Number) rv).longValue());
+ rv = Long.valueOf(((Number) rv).longValue());
}
else if (rc == Float.class) {
lv = new Float(((Number) lv).floatValue());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ConstantExpression.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ConstantExpression.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/ConstantExpression.java Mon Jun 25 03:45:55 2007
@@ -60,25 +60,25 @@
long l = value.longValue();
if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
- value = new Integer(value.intValue());
+ value = Integer.valueOf(value.intValue());
}
return new ConstantExpression(value);
}
public static ConstantExpression createFromHex(String text) {
- Number value = new Long(Long.parseLong(text.substring(2), 16));
+ Number value = Long.valueOf(Long.parseLong(text.substring(2), 16));
long l = value.longValue();
if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
- value = new Integer(value.intValue());
+ value = Integer.valueOf(value.intValue());
}
return new ConstantExpression(value);
}
public static ConstantExpression createFromOctal(String text) {
- Number value = new Long(Long.parseLong(text, 8));
+ Number value = Long.valueOf(Long.parseLong(text, 8));
long l = value.longValue();
if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
- value = new Integer(value.intValue());
+ value = Integer.valueOf(value.intValue());
}
return new ConstantExpression(value);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/PropertyExpression.java Mon Jun 25 03:45:55 2007
@@ -66,12 +66,12 @@
});
JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubExpression() {
public Object evaluate(Message message) {
- return new Integer(message.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT );
+ return Integer.valueOf(message.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT );
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubExpression() {
public Object evaluate(Message message) {
- return new Integer(message.getPriority());
+ return Integer.valueOf(message.getPriority());
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSMessageID", new SubExpression() {
@@ -83,7 +83,7 @@
});
JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubExpression() {
public Object evaluate(Message message) {
- return new Long(message.getTimestamp());
+ return Long.valueOf(message.getTimestamp());
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubExpression() {
@@ -93,28 +93,28 @@
});
JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubExpression() {
public Object evaluate(Message message) {
- return new Long(message.getExpiration());
+ return Long.valueOf(message.getExpiration());
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubExpression() {
public Object evaluate(Message message) {
- return new Integer(message.getPriority());
+ return Integer.valueOf(message.getPriority());
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubExpression() {
public Object evaluate(Message message) {
- return new Long(message.getTimestamp());
+ return Long.valueOf(message.getTimestamp());
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubExpression() {
public Object evaluate(Message message) {
- return new Boolean(message.isRedelivered());
+ return Boolean.valueOf(message.isRedelivered());
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSXDeliveryCount", new SubExpression() {
public Object evaluate(Message message) {
- return new Integer(message.getRedeliveryCounter()+1);
+ return Integer.valueOf(message.getRedeliveryCounter()+1);
}
});
JMS_PROPERTY_EXPRESSIONS.put("JMSXGroupID", new SubExpression() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/UnaryExpression.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/UnaryExpression.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/filter/UnaryExpression.java Mon Jun 25 03:45:55 2007
@@ -195,7 +195,7 @@
bd = bd.negate();
if( BD_LONG_MIN_VALUE.compareTo(bd)==0 ) {
- return new Long(Long.MIN_VALUE);
+ return Long.valueOf(Long.MIN_VALUE);
}
return bd;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon Jun 25 03:45:55 2007
@@ -51,7 +51,7 @@
private static final Log log=LogFactory.getLog(AsyncDataManager.class);
- public static int CONTROL_RECORD_MAX_LENGTH=1024;
+ public static final int CONTROL_RECORD_MAX_LENGTH=1024;
public static final int ITEM_HEAD_RESERVED_SPACE=21;
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
@@ -67,9 +67,9 @@
public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2;
- public static String DEFAULT_DIRECTORY="data";
- public static String DEFAULT_FILE_PREFIX="data-";
- public static int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
+ public static final String DEFAULT_DIRECTORY="data";
+ public static final String DEFAULT_FILE_PREFIX="data-";
+ public static final int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
private File directory = new File(DEFAULT_DIRECTORY);
private String filePrefix=DEFAULT_FILE_PREFIX;
@@ -314,7 +314,7 @@
public synchronized void addInterestInFile(int file) throws IOException{
if(file>=0){
- Integer key=new Integer(file);
+ Integer key=Integer.valueOf(file);
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile==null){
throw new IOException("That data file does not exist");
@@ -331,7 +331,7 @@
public synchronized void removeInterestInFile(int file) throws IOException{
if(file>=0){
- Integer key=new Integer(file);
+ Integer key=Integer.valueOf(file);
DataFile dataFile=(DataFile) fileMap.get(key);
removeInterestInFile(dataFile);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Mon Jun 25 03:45:55 2007
@@ -352,7 +352,7 @@
write = (WriteCommand) write.getNext();
}
}
-
+ buff.close();
} catch (IOException e) {
synchronized( enqueueMutex ) {
firstAsyncException = e;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Mon Jun 25 03:45:55 2007
@@ -41,7 +41,7 @@
public final class DataManagerImpl implements DataManager {
private static final Log log=LogFactory.getLog(DataManagerImpl.class);
- public static long MAX_FILE_LENGTH=1024*1024*32;
+ public static final long MAX_FILE_LENGTH=1024*1024*32;
private static final String NAME_PREFIX="data-";
private final File dir;
private final String name;
@@ -239,7 +239,7 @@
*/
public synchronized void addInterestInFile(int file) throws IOException{
if(file>=0){
- Integer key=new Integer(file);
+ Integer key=Integer.valueOf(file);
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile==null){
dataFile=createAndAddDataFile(file);
@@ -259,7 +259,7 @@
*/
public synchronized void removeInterestInFile(int file) throws IOException{
if(file>=0){
- Integer key=new Integer(file);
+ Integer key=Integer.valueOf(file);
DataFile dataFile=(DataFile) fileMap.get(key);
removeInterestInFile(dataFile);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Mon Jun 25 03:45:55 2007
@@ -151,7 +151,7 @@
return result;
}
- long getLength(){
+ synchronized long getLength(){
return length;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Mon Jun 25 03:45:55 2007
@@ -72,9 +72,6 @@
private boolean dispatchAsync;
private String destinationFilter = ">";
- private int queueDispatched;
- private int topicDispatched;
-
BrokerId localBrokerId;
BrokerId remoteBrokerId;
private NetworkBridgeFailedListener bridgeFailedListener;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/BaseDataStreamMarshaller.java Mon Jun 25 03:45:55 2007
@@ -183,7 +183,7 @@
tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
- new Integer(dataIn.readInt())
+ Integer.valueOf(dataIn.readInt())
});
} catch (IOException e) {
throw e;
@@ -484,7 +484,7 @@
looseUnmarshalString(dataIn),
looseUnmarshalString(dataIn),
looseUnmarshalString(dataIn),
- new Integer(dataIn.readInt())
+ Integer.valueOf(dataIn.readInt())
});
} catch (IOException e) {
throw e;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BaseDataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BaseDataStreamMarshaller.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BaseDataStreamMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v3/BaseDataStreamMarshaller.java Mon Jun 25 03:45:55 2007
@@ -182,7 +182,7 @@
tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
tightUnmarshalString(dataIn, bs),
- new Integer(dataIn.readInt())
+ Integer.valueOf(dataIn.readInt())
});
} catch (IOException e) {
throw e;
@@ -483,7 +483,7 @@
looseUnmarshalString(dataIn),
looseUnmarshalString(dataIn),
looseUnmarshalString(dataIn),
- new Integer(dataIn.readInt())
+ Integer.valueOf(dataIn.readInt())
});
} catch (IOException e) {
throw e;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationEntry.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/AuthorizationEntry.java Mon Jun 25 03:45:55 2007
@@ -108,7 +108,7 @@
paramClass[0] = String.class;
Object[] param = new Object[1];
- param[0] = new String(name);
+ param[0] = name;
try {
Class cls = Class.forName(groupClass);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java Mon Jun 25 03:45:55 2007
@@ -133,8 +133,8 @@
String queueSearchSubtree = (String) options.get(QUEUE_SEARCH_SUBTREE);
topicSearchMatchingFormat = new MessageFormat(topicSearchMatching);
queueSearchMatchingFormat = new MessageFormat(queueSearchMatching);
- topicSearchSubtreeBool = new Boolean(topicSearchSubtree).booleanValue();
- queueSearchSubtreeBool = new Boolean(queueSearchSubtree).booleanValue();
+ topicSearchSubtreeBool = Boolean.valueOf(topicSearchSubtree).booleanValue();
+ queueSearchSubtreeBool = Boolean.valueOf(queueSearchSubtree).booleanValue();
}
public Set getTempDestinationAdminACLs() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Mon Jun 25 03:45:55 2007
@@ -162,7 +162,7 @@
}
synchronized void addInterestInRecordFile(int recordNumber) {
- Integer key = new Integer(recordNumber);
+ Integer key = Integer.valueOf(recordNumber);
AtomicInteger rr = recordReferences.get(key);
if (rr == null) {
rr = new AtomicInteger();
@@ -172,7 +172,7 @@
}
synchronized void removeInterestInRecordFile(int recordNumber) {
- Integer key = new Integer(recordNumber);
+ Integer key = Integer.valueOf(recordNumber);
AtomicInteger rr = recordReferences.get(key);
if (rr != null && rr.decrementAndGet() <= 0) {
recordReferences.remove(key);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Jun 25 03:45:55 2007
@@ -217,7 +217,7 @@
if(msg!=null){
recoverReference(listener,msg);
count++;
- container.setBatchEntry(msg.getMessageId().toString(),entry);
+ container.setBatchEntry(msg.getMessageId(),entry);
}else {
container.reset();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Mon Jun 25 03:45:55 2007
@@ -41,6 +41,7 @@
}
/**
+ * @param id
* @param batchEntry the batchEntry to set
*/
public void setBatchEntry(String id,StoreEntry batchEntry) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Mon Jun 25 03:45:55 2007
@@ -30,7 +30,7 @@
public class Scheduler {
- static public ScheduledThreadPoolExecutor clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory(){
+ public static final ScheduledThreadPoolExecutor clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory(){
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable,"ActiveMQ Scheduler");
thread.setDaemon(true);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Mon Jun 25 03:45:55 2007
@@ -84,7 +84,7 @@
Response response=(Response)command;
FutureResponse future=null;
synchronized(requestMap){
- future=(FutureResponse)requestMap.remove(new Integer(response.getCorrelationId()));
+ future=(FutureResponse)requestMap.remove(Integer.valueOf(response.getCorrelationId()));
}
if(future!=null){
future.set(response);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java Mon Jun 25 03:45:55 2007
@@ -51,10 +51,10 @@
int max = size - 1;
while (map.size() >= max) {
// lets find things to evict
- Object evictedBuffer = map.remove(new Integer(++lowestCommandId));
+ Object evictedBuffer = map.remove(Integer.valueOf(++lowestCommandId));
onEvictedBuffer(lowestCommandId, evictedBuffer);
}
- map.put(new Integer(commandId), buffer);
+ map.put(Integer.valueOf(commandId), buffer);
}
}
@@ -72,7 +72,7 @@
for (int i = fromCommandId; i <= toCommandId; i++) {
Object buffer = null;
synchronized (lock) {
- buffer = map.get(new Integer(i));
+ buffer = map.get(Integer.valueOf(i));
}
replayer.sendBuffer(i, buffer);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Mon Jun 25 03:45:55 2007
@@ -114,7 +114,7 @@
command.setCommandId(generateCommandId());
if(handler!=null) {
command.setResponseRequired(true);
- resposeHandlers.put(new Integer(command.getCommandId()), handler);
+ resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
}
transportFilter.sendToActiveMQ(command);
}
@@ -472,7 +472,7 @@
if ( command.isResponse() ) {
Response response = (Response) command;
- ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(new Integer(response.getCorrelationId()));
+ ResponseHandler rh = (ResponseHandler) resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
if( rh !=null ) {
rh.onResponse(this, response);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Mon Jun 25 03:45:55 2007
@@ -189,6 +189,7 @@
throw new ProtocolException(errorMessage, true);
baos.write(b);
}
+ baos.close();
ByteSequence sequence = baos.toByteSequence();
return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Mon Jun 25 03:45:55 2007
@@ -165,9 +165,9 @@
}
else {
HashMap options = new HashMap();
- options.put("maxInactivityDuration", new Long(maxInactivityDuration));
- options.put("minmumWireFormatVersion", new Integer(minmumWireFormatVersion));
- options.put("trace", new Boolean(trace));
+ options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
+ options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
+ options.put("trace", Boolean.valueOf(trace));
options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MarshallingSupport.java Mon Jun 25 03:45:55 2007
@@ -155,22 +155,22 @@
Object value=null;
switch( in.readByte() ) {
case BYTE_TYPE:
- value = new Byte(in.readByte());
+ value = Byte.valueOf(in.readByte());
break;
case BOOLEAN_TYPE:
value = in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
break;
case CHAR_TYPE:
- value = new Character(in.readChar());
+ value = Character.valueOf(in.readChar());
break;
case SHORT_TYPE:
- value = new Short(in.readShort());
+ value = Short.valueOf(in.readShort());
break;
case INTEGER_TYPE:
- value = new Integer(in.readInt());
+ value = Integer.valueOf(in.readInt());
break;
case LONG_TYPE:
- value = new Long(in.readLong());
+ value = Long.valueOf(in.readLong());
break;
case FLOAT_TYPE:
value = new Float(in.readFloat());
@@ -378,6 +378,7 @@
DataByteArrayOutputStream dataOut=new DataByteArrayOutputStream();
props.store(dataOut,"");
result=new String(dataOut.getData(),0,dataOut.size());
+ dataOut.close();
}
return result;
}
@@ -387,6 +388,7 @@
if (str != null && str.length() > 0 ) {
DataByteArrayInputStream dataIn = new DataByteArrayInputStream(str.getBytes());
result.load(dataIn);
+ dataIn.close();
}
return result;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryIntPropertyEditor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryIntPropertyEditor.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryIntPropertyEditor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryIntPropertyEditor.java Mon Jun 25 03:45:55 2007
@@ -32,28 +32,28 @@
Pattern p = Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$",Pattern.CASE_INSENSITIVE);
Matcher m = p.matcher(text);
if (m.matches()) {
- setValue(new Integer(Integer.parseInt(m.group(1))));
+ setValue(Integer.valueOf(Integer.parseInt(m.group(1))));
return;
}
p = Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$",Pattern.CASE_INSENSITIVE);
m = p.matcher(text);
if (m.matches()) {
- setValue(new Integer(Integer.parseInt(m.group(1)) * 1024));
+ setValue(Integer.valueOf(Integer.parseInt(m.group(1)) * 1024));
return;
}
p = Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE);
m = p.matcher(text);
if (m.matches()) {
- setValue(new Integer(Integer.parseInt(m.group(1)) * 1024 * 1024 ));
+ setValue(Integer.valueOf(Integer.parseInt(m.group(1)) * 1024 * 1024 ));
return;
}
p = Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE);
m = p.matcher(text);
if (m.matches()) {
- setValue(new Integer(Integer.parseInt(m.group(1)) * 1024 * 1024 * 1024 ));
+ setValue(Integer.valueOf(Integer.parseInt(m.group(1)) * 1024 * 1024 * 1024 ));
return;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MemoryPropertyEditor.java Mon Jun 25 03:45:55 2007
@@ -32,28 +32,28 @@
Pattern p = Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$",Pattern.CASE_INSENSITIVE);
Matcher m = p.matcher(text);
if (m.matches()) {
- setValue(new Long(Long.parseLong(m.group(1))));
+ setValue(Long.valueOf(Long.parseLong(m.group(1))));
return;
}
p = Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$",Pattern.CASE_INSENSITIVE);
m = p.matcher(text);
if (m.matches()) {
- setValue(new Long(Long.parseLong(m.group(1)) * 1024));
+ setValue(Long.valueOf(Long.parseLong(m.group(1)) * 1024));
return;
}
p = Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE);
m = p.matcher(text);
if (m.matches()) {
- setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 ));
+ setValue(Long.valueOf(Long.parseLong(m.group(1)) * 1024 * 1024 ));
return;
}
p = Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE);
m = p.matcher(text);
if (m.matches()) {
- setValue(new Long(Long.parseLong(m.group(1)) * 1024 * 1024 * 1024 ));
+ setValue(Long.valueOf(Long.parseLong(m.group(1)) * 1024 * 1024 * 1024 ));
return;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/TypeConversionSupport.java Mon Jun 25 03:45:55 2007
@@ -102,7 +102,7 @@
Converter longConverter = new Converter() {
public Object convert(Object value) {
- return new Long(((Number) value).longValue());
+ return Long.valueOf(((Number) value).longValue());
}
};
CONVERSION_MAP.put(new ConversionKey(Byte.class, Long.class), longConverter);
@@ -110,13 +110,13 @@
CONVERSION_MAP.put(new ConversionKey(Integer.class, Long.class), longConverter);
CONVERSION_MAP.put(new ConversionKey(Date.class, Long.class), new Converter() {
public Object convert(Object value) {
- return new Long(((Date) value).getTime());
+ return Long.valueOf(((Date) value).getTime());
}
});
Converter intConverter = new Converter() {
public Object convert(Object value) {
- return new Integer(((Number) value).intValue());
+ return Integer.valueOf(((Number) value).intValue());
}
};
CONVERSION_MAP.put(new ConversionKey(Byte.class, Integer.class), intConverter);
@@ -124,7 +124,7 @@
CONVERSION_MAP.put(new ConversionKey(Byte.class, Short.class), new Converter() {
public Object convert(Object value) {
- return new Short(((Number) value).shortValue());
+ return Short.valueOf(((Number) value).shortValue());
}
});
Modified: activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/eclipse-resources/log4j.properties Mon Jun 25 03:45:55 2007
@@ -18,12 +18,20 @@
#
# The logging properties used for eclipse testing, We want to see debug output on the console.
#
-log4j.rootLogger=WARN, out
+log4j.rootLogger=INFO, out
+
-log4j.logger.org.apache.activemq=DEBUG
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
+
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=550449&r1=550448&r2=550449
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java Mon Jun 25 03:45:55 2007
@@ -1,438 +1,438 @@
-package org.apache.activemq;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.memory.UsageManager;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.pool.PooledConnectionFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-
-
-public class AMQDeadlockTest3 extends TestCase {
- private static final transient Log log = LogFactory.getLog(AMQDeadlockTest3.class);
-
- private static final String URL1 = "tcp://localhost:61616";
-
- private static final String URL2 = "tcp://localhost:61617";
-
- private static final String QUEUE1_NAME = "test.queue.1";
-
- private static final String QUEUE2_NAME = "test.queue.2";
-
- private static final int MAX_CONSUMERS = 1;
-
- private static final int MAX_PRODUCERS = 1;
-
- private static final int NUM_MESSAGE_TO_SEND = 10;
-
- private AtomicInteger messageCount = new AtomicInteger();
- private CountDownLatch doneLatch;
-
- public void setUp() throws Exception {
- }
-
- public void tearDown() throws Exception {
- }
-
- // This should fail with incubator-activemq-fuse-4.1.0.5
- public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
-
- BrokerService brokerService1 = null;
- ActiveMQConnectionFactory acf = null;
- PooledConnectionFactory pcf = null;
- DefaultMessageListenerContainer container1 = null;
-
- try {
- brokerService1 = createBrokerService("broker1", URL1, null);
- brokerService1.start();
-
- acf = createConnectionFactory(URL1);
- pcf = new PooledConnectionFactory(acf);
-
- // Only listen on the first queue.. let the 2nd queue fill up.
- doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
- container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
- container1.afterPropertiesSet();
-
- Thread.sleep(2000);
-
- final ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < MAX_PRODUCERS; i++) {
- executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
- Thread.sleep(1000);
- executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
- }
-
- // Wait for all message to arrive.
- assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
- executor.shutdownNow();
-
- Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
-
- } finally {
-
- container1.stop();
- container1.destroy();
- container1 = null;
- brokerService1.stop();
- brokerService1 = null;
-
- }
-
- }
-
-
-
-
- // This should fail with incubator-activemq-fuse-4.1.0.5
- public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
- throws Exception {
-
- BrokerService brokerService1 = null;
- BrokerService brokerService2 = null;
- ActiveMQConnectionFactory acf1 = null;
- ActiveMQConnectionFactory acf2 = null;
- PooledConnectionFactory pcf = null;
- DefaultMessageListenerContainer container1 = null;
-
- try {
- brokerService1 = createBrokerService("broker1", URL1, URL2);
- brokerService1.start();
- brokerService2 = createBrokerService("broker2", URL2, URL1);
- brokerService2.start();
-
- acf1 = createConnectionFactory(URL1);
- acf2 = createConnectionFactory(URL2);
-
- pcf = new PooledConnectionFactory(acf1);
-
- Thread.sleep(1000);
-
- doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
- container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
- container1.afterPropertiesSet();
-
- final ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < MAX_PRODUCERS; i++) {
- executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
- Thread.sleep(1000);
- executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
- }
-
- assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
- executor.shutdownNow();
-
- Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
- messageCount.get());
- } finally {
-
- container1.stop();
- container1.destroy();
- container1 = null;
-
- brokerService1.stop();
- brokerService1 = null;
- brokerService2.stop();
- brokerService2 = null;
- }
- }
-
-
- // This should fail with incubator-activemq-fuse-4.1.0.5
- public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
- throws Exception {
-
- BrokerService brokerService1 = null;
- BrokerService brokerService2 = null;
- ActiveMQConnectionFactory acf1 = null;
- ActiveMQConnectionFactory acf2 = null;
- DefaultMessageListenerContainer container1 = null;
- DefaultMessageListenerContainer container2 = null;
-
- try {
- brokerService1 = createBrokerService("broker1", URL1, URL2);
- brokerService1.start();
- brokerService2 = createBrokerService("broker2", URL2, URL1);
- brokerService2.start();
-
- acf1 = createConnectionFactory(URL1);
- acf2 = createConnectionFactory(URL2);
-
- Thread.sleep(1000);
-
- doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
-
- container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
- container1.afterPropertiesSet();
- container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
- container2.afterPropertiesSet();
-
- final ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < MAX_PRODUCERS; i++) {
- executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
- Thread.sleep(1000);
- executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
- }
-
- assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
- executor.shutdownNow();
-
- Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
- } finally {
-
- container1.stop();
- container1.destroy();
- container1 = null;
-
- container2.stop();
- container2.destroy();
- container2 = null;
-
- brokerService1.stop();
- brokerService1 = null;
- brokerService2.stop();
- brokerService2 = null;
- }
- }
-
-
-
-
- private BrokerService createBrokerService(final String brokerName,
- final String uri1, final String uri2) throws Exception {
- final BrokerService brokerService = new BrokerService();
-
- brokerService.setBrokerName(brokerName);
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
-
- final UsageManager memoryManager = new UsageManager();
- memoryManager.setLimit(5000000);
- brokerService.setMemoryManager(memoryManager);
-
- final ArrayList policyEntries = new ArrayList();
-
- final PolicyEntry entry = new PolicyEntry();
- entry.setQueue(">");
- // entry.setQueue(QUEUE1_NAME);
- entry.setMemoryLimit(1000);
- policyEntries.add(entry);
-
- final PolicyMap policyMap = new PolicyMap();
- policyMap.setPolicyEntries(policyEntries);
- brokerService.setDestinationPolicy(policyMap);
-
- final TransportConnector tConnector = new TransportConnector();
- tConnector.setUri(new URI(uri1));
- tConnector.setBrokerName(brokerName);
- tConnector.setName(brokerName + ".transportConnector");
- brokerService.addConnector(tConnector);
-
- if (uri2 != null) {
- final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
- nc.setBridgeTempDestinations(true);
- nc.setBrokerName(brokerName);
- brokerService.addNetworkConnector(nc);
- }
-
- return brokerService;
-
- }
-
- public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
- final ConnectionFactory acf, final MessageListener listener,
- final String queue) {
- final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
- container.setConnectionFactory(acf);
- container.setDestinationName(queue);
- container.setMessageListener(listener);
- container.setSessionTransacted(false);
- container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
- container.setConcurrentConsumers(MAX_CONSUMERS);
- return container;
- }
-
- public ActiveMQConnectionFactory createConnectionFactory(final String url) {
- final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
- acf.setCopyMessageOnSend(false);
- acf.setUseAsyncSend(false);
- acf.setDispatchAsync(true);
- acf.setUseCompression(false);
- acf.setOptimizeAcknowledge(false);
- acf.setOptimizedMessageDispatch(true);
- acf.setAlwaysSyncSend(true);
- return acf;
- }
-
- private class TestMessageListener1 implements MessageListener {
-
- private final long waitTime;
-
- public TestMessageListener1(long waitTime) {
- this.waitTime = waitTime;
-
- }
-
- public void onMessage(Message msg) {
-
- try {
- log.info("Listener1 Consumed message "+ msg.getIntProperty("count"));
-
- messageCount.incrementAndGet();
- doneLatch.countDown();
-
- Thread.sleep(waitTime);
- } catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- }
-
-
- private class PooledProducerTask implements Runnable {
-
- private final String queueName;
-
- private final PooledConnectionFactory pcf;
-
- public PooledProducerTask(final PooledConnectionFactory pcf,
- final String queueName) {
- this.pcf = pcf;
- this.queueName = queueName;
- }
-
- public void run() {
-
- try {
-
- final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
- jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- jmsTemplate.setExplicitQosEnabled(true);
- jmsTemplate.setMessageIdEnabled(false);
- jmsTemplate.setMessageTimestampEnabled(false);
- jmsTemplate.afterPropertiesSet();
-
- final byte[] bytes = new byte[2048];
- final Random r = new Random();
- r.nextBytes(bytes);
-
- Thread.sleep(2000);
-
- final AtomicInteger count = new AtomicInteger();
- for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
- jmsTemplate.send(queueName, new MessageCreator() {
-
- public Message createMessage(Session session)
- throws JMSException {
-
- final BytesMessage message = session.createBytesMessage();
-
- message.writeBytes(bytes);
- message.setIntProperty("count", count.incrementAndGet());
- message.setStringProperty("producer", "pooled");
- return message;
- }
- });
-
- log.info("PooledProducer sent message: "+ count.get());
- // Thread.sleep(1000);
- }
-
- } catch (final Throwable e) {
- log.error("Producer 1 is exiting", e);
- }
- }
- }
-
-
- private class NonPooledProducerTask implements Runnable {
-
- private final String queueName;
-
- private final ConnectionFactory cf;
-
- public NonPooledProducerTask(final ConnectionFactory cf,
- final String queueName) {
- this.cf = cf;
- this.queueName = queueName;
- }
-
- public void run() {
-
- try {
-
- final JmsTemplate jmsTemplate = new JmsTemplate(cf);
- jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- jmsTemplate.setExplicitQosEnabled(true);
- jmsTemplate.setMessageIdEnabled(false);
- jmsTemplate.setMessageTimestampEnabled(false);
- jmsTemplate.afterPropertiesSet();
-
- final byte[] bytes = new byte[2048];
- final Random r = new Random();
- r.nextBytes(bytes);
-
- Thread.sleep(2000);
-
- final AtomicInteger count = new AtomicInteger();
- for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
- jmsTemplate.send(queueName, new MessageCreator() {
-
- public Message createMessage(Session session)
- throws JMSException {
-
- final BytesMessage message = session
- .createBytesMessage();
-
- message.writeBytes(bytes);
- message.setIntProperty("count", count
- .incrementAndGet());
- message.setStringProperty("producer", "non-pooled");
- return message;
- }
- });
-
- log.info("Non-PooledProducer sent message: " + count.get());
-
- // Thread.sleep(1000);
- }
-
- } catch (final Throwable e) {
- log.error("Producer 1 is exiting", e);
- }
- }
- }
-
-}
+package org.apache.activemq;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.pool.PooledConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+
+public class AMQDeadlockTest3 extends TestCase {
+ private static final transient Log log = LogFactory.getLog(AMQDeadlockTest3.class);
+
+ private static final String URL1 = "tcp://localhost:61616";
+
+ private static final String URL2 = "tcp://localhost:61617";
+
+ private static final String QUEUE1_NAME = "test.queue.1";
+
+ private static final String QUEUE2_NAME = "test.queue.2";
+
+ private static final int MAX_CONSUMERS = 1;
+
+ private static final int MAX_PRODUCERS = 1;
+
+ private static final int NUM_MESSAGE_TO_SEND = 10;
+
+ private AtomicInteger messageCount = new AtomicInteger();
+ private CountDownLatch doneLatch;
+
+ public void setUp() throws Exception {
+ }
+
+ public void tearDown() throws Exception {
+ }
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithOneBrokerSameConnection() throws Exception {
+
+ BrokerService brokerService1 = null;
+ ActiveMQConnectionFactory acf = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, null);
+ brokerService1.start();
+
+ acf = createConnectionFactory(URL1);
+ pcf = new PooledConnectionFactory(acf);
+
+ // Only listen on the first queue.. let the 2nd queue fill up.
+ doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND);
+ container1 = createDefaultMessageListenerContainer(acf, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ Thread.sleep(2000);
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+ }
+
+ // Wait for all message to arrive.
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(NUM_MESSAGE_TO_SEND, messageCount.get());
+
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+ brokerService1.stop();
+ brokerService1 = null;
+
+ }
+
+ }
+
+
+
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithOneConnectionForProducing()
+ throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ PooledConnectionFactory pcf = null;
+ DefaultMessageListenerContainer container1 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2, URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ pcf = new PooledConnectionFactory(acf1);
+
+ Thread.sleep(1000);
+
+ doneLatch = new CountDownLatch(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND);
+ container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new PooledProducerTask(pcf, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new PooledProducerTask(pcf, QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND,
+ messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+
+ // This should fail with incubator-activemq-fuse-4.1.0.5
+ public void testQueueLimitsWithTwoBrokerProduceandConsumeonDifferentBrokersWithSeperateConnectionsForProducing()
+ throws Exception {
+
+ BrokerService brokerService1 = null;
+ BrokerService brokerService2 = null;
+ ActiveMQConnectionFactory acf1 = null;
+ ActiveMQConnectionFactory acf2 = null;
+ DefaultMessageListenerContainer container1 = null;
+ DefaultMessageListenerContainer container2 = null;
+
+ try {
+ brokerService1 = createBrokerService("broker1", URL1, URL2);
+ brokerService1.start();
+ brokerService2 = createBrokerService("broker2", URL2, URL1);
+ brokerService2.start();
+
+ acf1 = createConnectionFactory(URL1);
+ acf2 = createConnectionFactory(URL2);
+
+ Thread.sleep(1000);
+
+ doneLatch = new CountDownLatch(NUM_MESSAGE_TO_SEND*MAX_PRODUCERS);
+
+ container1 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(500), QUEUE1_NAME);
+ container1.afterPropertiesSet();
+ container2 = createDefaultMessageListenerContainer(acf2, new TestMessageListener1(30000), QUEUE2_NAME);
+ container2.afterPropertiesSet();
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i = 0; i < MAX_PRODUCERS; i++) {
+ executor.submit(new NonPooledProducerTask(acf1, QUEUE2_NAME));
+ Thread.sleep(1000);
+ executor.submit(new NonPooledProducerTask(acf1, QUEUE1_NAME));
+ }
+
+ assertTrue(doneLatch.await(20, TimeUnit.SECONDS));
+ executor.shutdownNow();
+
+ Assert.assertEquals(MAX_PRODUCERS * NUM_MESSAGE_TO_SEND, messageCount.get());
+ } finally {
+
+ container1.stop();
+ container1.destroy();
+ container1 = null;
+
+ container2.stop();
+ container2.destroy();
+ container2 = null;
+
+ brokerService1.stop();
+ brokerService1 = null;
+ brokerService2.stop();
+ brokerService2 = null;
+ }
+ }
+
+
+
+
+ private BrokerService createBrokerService(final String brokerName,
+ final String uri1, final String uri2) throws Exception {
+ final BrokerService brokerService = new BrokerService();
+
+ brokerService.setBrokerName(brokerName);
+ brokerService.setPersistent(false);
+ brokerService.setUseJmx(true);
+
+ final UsageManager memoryManager = new UsageManager();
+ memoryManager.setLimit(5000000);
+ brokerService.setMemoryManager(memoryManager);
+
+ final ArrayList policyEntries = new ArrayList();
+
+ final PolicyEntry entry = new PolicyEntry();
+ entry.setQueue(">");
+ // entry.setQueue(QUEUE1_NAME);
+ entry.setMemoryLimit(1000);
+ policyEntries.add(entry);
+
+ final PolicyMap policyMap = new PolicyMap();
+ policyMap.setPolicyEntries(policyEntries);
+ brokerService.setDestinationPolicy(policyMap);
+
+ final TransportConnector tConnector = new TransportConnector();
+ tConnector.setUri(new URI(uri1));
+ tConnector.setBrokerName(brokerName);
+ tConnector.setName(brokerName + ".transportConnector");
+ brokerService.addConnector(tConnector);
+
+ if (uri2 != null) {
+ final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
+ nc.setBridgeTempDestinations(true);
+ nc.setBrokerName(brokerName);
+ brokerService.addNetworkConnector(nc);
+ }
+
+ return brokerService;
+
+ }
+
+ public DefaultMessageListenerContainer createDefaultMessageListenerContainer(
+ final ConnectionFactory acf, final MessageListener listener,
+ final String queue) {
+ final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
+ container.setConnectionFactory(acf);
+ container.setDestinationName(queue);
+ container.setMessageListener(listener);
+ container.setSessionTransacted(false);
+ container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+ container.setConcurrentConsumers(MAX_CONSUMERS);
+ return container;
+ }
+
+ public ActiveMQConnectionFactory createConnectionFactory(final String url) {
+ final ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
+ acf.setCopyMessageOnSend(false);
+ acf.setUseAsyncSend(false);
+ acf.setDispatchAsync(true);
+ acf.setUseCompression(false);
+ acf.setOptimizeAcknowledge(false);
+ acf.setOptimizedMessageDispatch(true);
+ acf.setAlwaysSyncSend(true);
+ return acf;
+ }
+
+ private class TestMessageListener1 implements MessageListener {
+
+ private final long waitTime;
+
+ public TestMessageListener1(long waitTime) {
+ this.waitTime = waitTime;
+
+ }
+
+ public void onMessage(Message msg) {
+
+ try {
+ log.info("Listener1 Consumed message "+ msg.getIntProperty("count"));
+
+ messageCount.incrementAndGet();
+ doneLatch.countDown();
+
+ Thread.sleep(waitTime);
+ } catch (JMSException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+
+ private static class PooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final PooledConnectionFactory pcf;
+
+ public PooledProducerTask(final PooledConnectionFactory pcf,
+ final String queueName) {
+ this.pcf = pcf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new JmsTemplate(pcf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new MessageCreator() {
+
+ public Message createMessage(Session session)
+ throws JMSException {
+
+ final BytesMessage message = session.createBytesMessage();
+
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count.incrementAndGet());
+ message.setStringProperty("producer", "pooled");
+ return message;
+ }
+ });
+
+ log.info("PooledProducer sent message: "+ count.get());
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ log.error("Producer 1 is exiting", e);
+ }
+ }
+ }
+
+
+ private static class NonPooledProducerTask implements Runnable {
+
+ private final String queueName;
+
+ private final ConnectionFactory cf;
+
+ public NonPooledProducerTask(final ConnectionFactory cf,
+ final String queueName) {
+ this.cf = cf;
+ this.queueName = queueName;
+ }
+
+ public void run() {
+
+ try {
+
+ final JmsTemplate jmsTemplate = new JmsTemplate(cf);
+ jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setMessageIdEnabled(false);
+ jmsTemplate.setMessageTimestampEnabled(false);
+ jmsTemplate.afterPropertiesSet();
+
+ final byte[] bytes = new byte[2048];
+ final Random r = new Random();
+ r.nextBytes(bytes);
+
+ Thread.sleep(2000);
+
+ final AtomicInteger count = new AtomicInteger();
+ for (int i = 0; i < NUM_MESSAGE_TO_SEND; i++) {
+ jmsTemplate.send(queueName, new MessageCreator() {
+
+ public Message createMessage(Session session)
+ throws JMSException {
+
+ final BytesMessage message = session
+ .createBytesMessage();
+
+ message.writeBytes(bytes);
+ message.setIntProperty("count", count
+ .incrementAndGet());
+ message.setStringProperty("producer", "non-pooled");
+ return message;
+ }
+ });
+
+ log.info("Non-PooledProducer sent message: " + count.get());
+
+ // Thread.sleep(1000);
+ }
+
+ } catch (final Throwable e) {
+ log.error("Producer 1 is exiting", e);
+ }
+ }
+ }
+
+}