You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/06/27 20:53:33 UTC
svn commit: r551271 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ broker/
broker/region/ broker/region/cursors/ broker/region/policy/ command/ jndi/
kaha/impl/async/ kaha/impl/container/ kaha/impl/data/ kaha/impl/index/hash/
...
Author: rajdavies
Date: Wed Jun 27 11:53:30 2007
New Revision: 551271
URL: http://svn.apache.org/viewvc?view=rev&rev=551271
Log:
ensure member variables are always synchronized
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Jun 27 11:53:30 2007
@@ -591,7 +591,7 @@
props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled()));
props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend()));
- props.setProperty("producerWindowSize", Integer.toString(producerWindowSize));
+ props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
}
public boolean isUseCompression() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jun 27 11:53:30 2007
@@ -136,7 +136,6 @@
private transient Thread shutdownHook;
private String[] transportConnectorURIs;
private String[] networkConnectorURIs;
- private String[] proxyConnectorURIs;
private JmsConnector[] jmsBridgeConnectors; //these are Jms to Jms bridges to other jms messaging systems
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
@@ -1126,13 +1125,7 @@
addNetworkConnector(uri);
}
}
- if (proxyConnectorURIs != null) {
- for (int i = 0; i < proxyConnectorURIs.length; i++) {
- String uri = proxyConnectorURIs[i];
- addProxyConnector(uri);
- }
- }
-
+
if (jmsBridgeConnectors != null){
for (int i = 0; i < jmsBridgeConnectors.length; i++){
addJmsConnector(jmsBridgeConnectors[i]);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Jun 27 11:53:30 2007
@@ -349,7 +349,7 @@
return null;
}
- public Response processWireFormat(WireFormatInfo info) throws Exception{
+ public synchronized Response processWireFormat(WireFormatInfo info) throws Exception{
wireFormatInfo=info;
protocolVersion.set(info.getVersion());
return null;
@@ -370,6 +370,9 @@
if(cs!=null){
context=cs.getContext();
}
+ if (cs == null) {
+ throw new NullPointerException("Context is null");
+ }
// Avoid replaying dup commands
if(cs.getTransactionState(info.getTransactionId())==null){
cs.addTransactionState(info.getTransactionId());
@@ -391,6 +394,9 @@
if(cs!=null){
context=cs.getContext();
}
+ if (cs == null) {
+ throw new NullPointerException("Context is null");
+ }
TransactionState transactionState=cs.getTransactionState(info.getTransactionId());
if(transactionState==null)
throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
@@ -414,6 +420,9 @@
if(cs!=null){
context=cs.getContext();
}
+ if (cs == null) {
+ throw new NullPointerException("Context is null");
+ }
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),true);
return null;
@@ -425,6 +434,9 @@
if(cs!=null){
context=cs.getContext();
}
+ if (cs == null) {
+ throw new NullPointerException("Context is null");
+ }
cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context,info.getTransactionId(),false);
return null;
@@ -436,6 +448,9 @@
if(cs!=null){
context=cs.getContext();
}
+ if (cs == null) {
+ throw new NullPointerException("Context is null");
+ }
cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context,info.getTransactionId());
return null;
@@ -869,17 +884,20 @@
log.debug("Stopping connection: "+transport.getRemoteAddress());
connector.onStopped(this);
try{
- if(masterBroker!=null){
- masterBroker.stop();
- }
- if (duplexBridge != null) {
- duplexBridge.stop();
- }
- // If the transport has not failed yet,
- // notify the peer that we are doing a normal shutdown.
- if(transportException==null){
- transport.oneway(new ShutdownInfo());
+ synchronized(this){
+ if(masterBroker!=null){
+ masterBroker.stop();
+ }
+ if(duplexBridge!=null){
+ duplexBridge.stop();
+ }
+ // If the transport has not failed yet,
+ // notify the peer that we are doing a normal shutdown.
+ if(transportException==null){
+ transport.oneway(new ShutdownInfo());
+ }
}
+
}catch(Exception ignore){
log.trace("Exception caught stopping",ignore);
}
@@ -1069,7 +1087,7 @@
this.pendingStop=pendingStop;
}
- public Response processBrokerInfo(BrokerInfo info){
+ public synchronized Response processBrokerInfo(BrokerInfo info){
if(info.isSlaveBroker()){
// stream messages from this broker (the master) to
// the slave
@@ -1172,8 +1190,10 @@
synchronized(consumerExchanges){
result=new ConsumerBrokerExchange();
ConnectionState state=lookupConnectionState(id);
- context=state.getContext();
- result.setConnectionContext(context);
+ synchronized(this){
+ context=state.getContext();
+ result.setConnectionContext(context);
+ }
SessionState ss=state.getSessionState(id.getParentId());
if(ss!=null){
ConsumerState cs=ss.getConsumerState(id);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Jun 27 11:53:30 2007
@@ -54,7 +54,7 @@
return active;
}
- protected boolean isFull(){
+ protected synchronized boolean isFull(){
return !active||super.isFull();
}
@@ -113,25 +113,23 @@
topic.deactivate(context,this);
}
}
- synchronized(dispatched){
- for(Iterator iter=dispatched.iterator();iter.hasNext();){
- // Mark the dispatched messages as redelivered for next time.
- MessageReference node=(MessageReference)iter.next();
- Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
- if(count!=null){
- redeliveredMessages.put(node.getMessageId(),Integer.valueOf(count.intValue()+1));
- }else{
- redeliveredMessages.put(node.getMessageId(),Integer.valueOf(1));
- }
- if(keepDurableSubsActive){
- synchronized(pending){
- pending.addMessageFirst(node);
- }
- }else{
- node.decrementReferenceCount();
+ for(Iterator iter=dispatched.iterator();iter.hasNext();){
+ // Mark the dispatched messages as redelivered for next time.
+ MessageReference node=(MessageReference)iter.next();
+ Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
+ if(count!=null){
+ redeliveredMessages.put(node.getMessageId(),Integer.valueOf(count.intValue()+1));
+ }else{
+ redeliveredMessages.put(node.getMessageId(),Integer.valueOf(1));
+ }
+ if(keepDurableSubsActive){
+ synchronized(pending){
+ pending.addMessageFirst(node);
}
- iter.remove();
+ }else{
+ node.decrementReferenceCount();
}
+ iter.remove();
}
if(!keepDurableSubsActive){
synchronized(pending){
@@ -167,11 +165,11 @@
super.add(node);
}
- protected void doAddRecoveredMessage(MessageReference message) throws Exception{
+ protected synchronized void doAddRecoveredMessage(MessageReference message) throws Exception{
pending.addRecoveredMessage(message);
}
- public int getPendingQueueSize(){
+ public synchronized int getPendingQueueSize(){
if(active||keepDurableSubsActive){
return super.getPendingQueueSize();
}
@@ -184,7 +182,7 @@
"You cannot dynamically change the selector for durable topic subscriptions");
}
- protected boolean canDispatch(MessageReference node){
+ protected synchronized boolean canDispatch(MessageReference node){
return active;
}
@@ -198,7 +196,7 @@
return subscriptionKey.getSubscriptionName();
}
- public String toString(){
+ public synchronized String toString(){
return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
+", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter
+", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension;
@@ -215,7 +213,7 @@
/**
* Release any references that we are holding.
*/
- public void destroy(){
+ public synchronized void destroy(){
try{
synchronized(pending){
pending.reset();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Wed Jun 27 11:53:30 2007
@@ -76,7 +76,7 @@
this.referenceCount=1;
message.incrementReferenceCount();
- this.cachedSize = message != null ? message.getSize() : 0;
+ this.cachedSize = message.getSize();
}
synchronized public Message getMessageHardRef() {
@@ -212,7 +212,7 @@
return false;
}
- public int getSize(){
+ public synchronized int getSize(){
Message msg = message;
if (msg != null){
return msg.getSize();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Jun 27 11:53:30 2007
@@ -104,7 +104,7 @@
* Occurs when a pull times out. If nothing has been dispatched since the timeout was setup, then send the NULL
* message.
*/
- private synchronized void pullTimeout(long dispatchCounterBeforePull){
+ final synchronized void pullTimeout(long dispatchCounterBeforePull){
if(dispatchCounterBeforePull==dispatchCounter){
try{
add(QueueMessageReference.NULL_MESSAGE);
@@ -300,14 +300,14 @@
/**
* @return true when 60% or more room is left for dispatching messages
*/
- public boolean isLowWaterMark(){
+ public synchronized boolean isLowWaterMark(){
return (dispatched.size()-prefetchExtension)<=(info.getPrefetchSize()*.4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
- public boolean isHighWaterMark(){
+ public synchronized boolean isHighWaterMark(){
return (dispatched.size()-prefetchExtension)>=(info.getPrefetchSize()*.9);
}
@@ -315,16 +315,12 @@
return info.getPrefetchSize()+prefetchExtension-dispatched.size();
}
- public int getPendingQueueSize(){
- synchronized(pending){
- return pending.size();
- }
+ public synchronized int getPendingQueueSize(){
+ return pending.size();
}
- public int getDispatchedQueueSize(){
- synchronized(dispatched){
- return dispatched.size();
- }
+ public synchronized int getDispatchedQueueSize(){
+ return dispatched.size();
}
synchronized public long getDequeueCounter(){
@@ -344,11 +340,11 @@
}
- public PendingMessageCursor getPending(){
+ public synchronized PendingMessageCursor getPending(){
return this.pending;
}
- public void setPending(PendingMessageCursor pending){
+ public synchronized void setPending(PendingMessageCursor pending){
this.pending=pending;
}
@@ -413,7 +409,7 @@
}
}
- protected boolean dispatch(final MessageReference node) throws IOException{
+ protected synchronized boolean dispatch(final MessageReference node) throws IOException{
final Message message=node.getMessage();
if(message==null){
return false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Jun 27 11:53:30 2007
@@ -47,7 +47,6 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store;
@@ -421,7 +420,7 @@
doMessageSend(producerExchange, message);
}
- private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+ void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
if(store!=null&&message.isPersistent()){
@@ -979,7 +978,7 @@
}
- private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
+ final void sendMessage(final ConnectionContext context,Message msg) throws Exception{
synchronized(messages){
messages.addMessageLast(msg);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Jun 27 11:53:30 2007
@@ -39,7 +39,7 @@
return !((QueueMessageReference)node).isAcked();
}
- public String toString() {
+ public synchronized String toString() {
return
"QueueBrowserSubscription:" +
" consumer="+info.getConsumerId()+
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Jun 27 11:53:30 2007
@@ -139,7 +139,7 @@
}
}
- public String toString() {
+ public synchronized String toString() {
return
"QueueSubscription:" +
" consumer="+info.getConsumerId()+
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Jun 27 11:53:30 2007
@@ -341,7 +341,7 @@
doMessageSend(producerExchange, message);
}
- private void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+ void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Wed Jun 27 11:53:30 2007
@@ -166,7 +166,7 @@
nonPersistent.addMessageLast(node);
}
- public void clear(){
+ public synchronized void clear(){
pendingCount=0;
nonPersistent.clear();
for(PendingMessageCursor tsp: storePrefetches){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Wed Jun 27 11:53:30 2007
@@ -90,7 +90,7 @@
}
}
- public void addMessageFirst(MessageReference node) throws Exception{
+ public synchronized void addMessageFirst(MessageReference node) throws Exception{
if(node!=null){
Message msg=node.getMessage();
if(started){
@@ -105,7 +105,7 @@
}
}
- public void clear(){
+ public synchronized void clear(){
pendingCount=0;
}
@@ -150,7 +150,7 @@
persistent.reset();
}
- public int size(){
+ public synchronized int size(){
return pendingCount;
}
@@ -165,25 +165,25 @@
* @see org.apache.activemq.region.cursors.PendingMessageCursor
* @return true if recovery required
*/
- public boolean isRecoveryRequired(){
+ public synchronized boolean isRecoveryRequired(){
return false;
}
/**
* @return the nonPersistent Cursor
*/
- public PendingMessageCursor getNonPersistent(){
+ public synchronized PendingMessageCursor getNonPersistent(){
return this.nonPersistent;
}
/**
* @param nonPersistent cursor to set
*/
- public void setNonPersistent(PendingMessageCursor nonPersistent){
+ public synchronized void setNonPersistent(PendingMessageCursor nonPersistent){
this.nonPersistent=nonPersistent;
}
- public void setMaxBatchSize(int maxBatchSize){
+ public synchronized void setMaxBatchSize(int maxBatchSize){
persistent.setMaxBatchSize(maxBatchSize);
if(nonPersistent!=null){
nonPersistent.setMaxBatchSize(maxBatchSize);
@@ -191,7 +191,7 @@
super.setMaxBatchSize(maxBatchSize);
}
- public void gc() {
+ public synchronized void gc() {
if (persistent != null) {
persistent.gc();
}
@@ -200,7 +200,7 @@
}
}
- public void setUsageManager(UsageManager usageManager){
+ public synchronized void setUsageManager(UsageManager usageManager){
super.setUsageManager(usageManager);
if (persistent != null) {
persistent.setUsageManager(usageManager);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Wed Jun 27 11:53:30 2007
@@ -80,7 +80,7 @@
/**
* @return true if there are no pendingCount messages
*/
- public boolean isEmpty(){
+ public synchronized boolean isEmpty(){
return pendingCount <= 0;
}
@@ -99,7 +99,7 @@
}
}
- public void addMessageFirst(MessageReference node) throws Exception{
+ public synchronized void addMessageFirst(MessageReference node) throws Exception{
if(node!=null){
if(started){
firstMessageId=node.getMessageId();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Wed Jun 27 11:53:30 2007
@@ -92,7 +92,7 @@
this.maximumSize=maximumSize;
}
- public Message[] browse(ActiveMQDestination destination) throws Exception{
+ public synchronized Message[] browse(ActiveMQDestination destination) throws Exception{
List result=new ArrayList();
DestinationFilter filter=DestinationFilter.parseFilter(destination);
int t=tail;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Wed Jun 27 11:53:30 2007
@@ -90,7 +90,7 @@
return IntrospectionSupport.toString(this);
}
- public int hasCode() {
+ public int hashCode() {
int h1 = clientId != null ? clientId.hashCode():-1;
int h2 = subcriptionName != null ? subcriptionName.hashCode():-1;
return h1 ^ h2;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/jndi/ReadOnlyContext.java Wed Jun 27 11:53:30 2007
@@ -389,6 +389,7 @@
}
private class ListEnumeration extends LocalNamingEnumeration {
+ ListEnumeration(){}
public Object next() throws NamingException {
return nextElement();
}
@@ -400,6 +401,8 @@
}
private class ListBindingEnumeration extends LocalNamingEnumeration {
+ ListBindingEnumeration(){
+ }
public Object next() throws NamingException {
return nextElement();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Wed Jun 27 11:53:30 2007
@@ -61,8 +61,8 @@
public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE;
- public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; //
- public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; //
+ static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; //
+ static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; //
public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2;
@@ -71,8 +71,8 @@
public static final String DEFAULT_FILE_PREFIX="data-";
public static final int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
- private File directory = new File(DEFAULT_DIRECTORY);
- private String filePrefix=DEFAULT_FILE_PREFIX;
+ File directory = new File(DEFAULT_DIRECTORY);
+ String filePrefix=DEFAULT_FILE_PREFIX;
private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512;
@@ -101,8 +101,10 @@
started=true;
directory.mkdirs();
- controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH);
- controlFile.lock();
+ synchronized(this){
+ controlFile=new ControlFile(new File(directory,filePrefix+"control"),CONTROL_RECORD_MAX_LENGTH);
+ controlFile.lock();
+ }
ByteSequence sequence = controlFile.load();
if( sequence != null && sequence.getLength()>0 ) {
@@ -116,7 +118,7 @@
File[] files=directory.listFiles(new FilenameFilter(){
public boolean accept(File dir,String n){
- return dir.equals(dir)&&n.startsWith(filePrefix);
+ return dir.equals(directory)&&n.startsWith(filePrefix);
}
});
@@ -252,8 +254,8 @@
}
DataFile getDataFile(Location item) throws IOException{
- Integer key=new Integer(item.getDataFileId());
- DataFile dataFile=(DataFile) fileMap.get(key);
+ Integer key= Integer.valueOf(item.getDataFileId());
+ DataFile dataFile=fileMap.get(key);
if(dataFile==null){
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId());
@@ -265,14 +267,12 @@
return (DataFile) dataFile.getNext();
}
- public void close() throws IOException{
- synchronized(this){
- if(!started){
- return;
- }
- Scheduler.cancel(cleanupTask);
- accessorPool.close();
+ public synchronized void close() throws IOException{
+ if(!started){
+ return;
}
+ Scheduler.cancel(cleanupTask);
+ accessorPool.close();
storeState(false);
appender.close();
fileMap.clear();
@@ -281,7 +281,7 @@
started=false;
}
- private synchronized void cleanup() {
+ synchronized void cleanup() {
if( accessorPool!=null ) {
accessorPool.disposeUnused();
}
@@ -375,7 +375,7 @@
}
}
- private void removeDataFile(DataFile dataFile) throws IOException{
+ private synchronized void removeDataFile(DataFile dataFile) throws IOException{
// Make sure we don't delete too much data.
if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId() ) {
@@ -414,7 +414,7 @@
return mark;
}
- public Location getNextLocation(Location location) throws IOException, IllegalStateException {
+ public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
Location cur = null;
@@ -492,17 +492,17 @@
storeState(sync);
}
- private void storeState(boolean sync) throws IOException {
- ByteSequence state = marshallState();
- appender.storeItem(state, Location.MARK_TYPE, sync);
- controlFile.store(state, sync);
- }
+ private synchronized void storeState(boolean sync) throws IOException{
+ ByteSequence state=marshallState();
+ appender.storeItem(state,Location.MARK_TYPE,sync);
+ controlFile.store(state,sync);
+ }
- public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+ public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, Location.USER_TYPE, sync);
}
- public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
+ public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
return appender.storeItem(data, type, sync);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Wed Jun 27 11:53:30 2007
@@ -27,7 +27,7 @@
*
* @version $Revision: 1.1.1.1 $
*/
-class DataFile extends LinkedNode implements Comparable {
+class DataFile extends LinkedNode implements Comparable<DataFile> {
private final File file;
private final Integer dataFileId;
@@ -39,7 +39,7 @@
DataFile(File file, int number, int preferedSize){
this.file=file;
this.preferedSize = preferedSize;
- this.dataFileId=new Integer(number);
+ this.dataFileId=Integer.valueOf(number);
length=(int)(file.exists()?file.length():0);
}
@@ -98,10 +98,17 @@
return file.delete();
}
- public int compareTo(Object o) {
- DataFile df = (DataFile) o;
+ public int compareTo(DataFile df) {
return dataFileId - df.dataFileId;
}
+
+ public boolean equals(Object o) {
+ boolean result = false;
+ if (o instanceof DataFile) {
+ result = compareTo((DataFile)o)==0;
+ }
+ return result;
+ }
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Wed Jun 27 11:53:30 2007
@@ -55,10 +55,13 @@
return hash;
}
- public boolean equals(Object obj) {
- WriteKey di = (WriteKey)obj;
- return di.file == file && di.offset == offset;
- }
+ public boolean equals(Object obj){
+ if(obj instanceof WriteKey){
+ WriteKey di=(WriteKey)obj;
+ return di.file==file&&di.offset==offset;
+ }
+ return false;
+ }
}
public class WriteBatch {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java Wed Jun 27 11:53:30 2007
@@ -131,5 +131,18 @@
}
return dataFileId - l.dataFileId;
}
+
+ public boolean equals(Object o) {
+ boolean result = false;
+ if (o instanceof Location) {
+ result = compareTo((Location)o)==0;
+ }
+ return result;
+ }
+
+ public int hashCode() {
+ return dataFileId ^ offset;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java Wed Jun 27 11:53:30 2007
@@ -188,24 +188,26 @@
}
}
- protected final void delete(IndexItem key,IndexItem prev,IndexItem next){
- try{
- dataManager.removeInterestInFile(key.getKeyFile());
- dataManager.removeInterestInFile(key.getValueFile());
- prev=prev==null?root:prev;
- next=next!=root?next:null;
- if(next!=null){
- prev.setNextItem(next.getOffset());
- next.setPreviousItem(prev.getOffset());
- updateIndexes(next);
- }else{
- prev.setNextItem(Item.POSITION_NOT_SET);
+ protected final void delete(final IndexItem keyItem,final IndexItem prevItem,final IndexItem nextItem){
+ if(keyItem!=null){
+ try{
+ IndexItem prev=prevItem==null?root:prevItem;
+ IndexItem next=nextItem!=root?nextItem:null;
+ dataManager.removeInterestInFile(keyItem.getKeyFile());
+ dataManager.removeInterestInFile(keyItem.getValueFile());
+ if(next!=null){
+ prev.setNextItem(next.getOffset());
+ next.setPreviousItem(prev.getOffset());
+ updateIndexes(next);
+ }else{
+ prev.setNextItem(Item.POSITION_NOT_SET);
+ }
+ updateIndexes(prev);
+ indexManager.freeIndex(keyItem);
+ }catch(IOException e){
+ log.error("Failed to delete "+keyItem,e);
+ throw new RuntimeStoreException(e);
}
- updateIndexes(prev);
- indexManager.freeIndex(key);
- }catch(IOException e){
- log.error("Failed to delete "+key,e);
- throw new RuntimeStoreException(e);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerKeySet.java Wed Jun 27 11:53:30 2007
@@ -115,13 +115,14 @@
}
public String toString() {
- String result ="ContainerKeySet[";
+ StringBuffer result =new StringBuffer(32);
+ result.append("ContainerKeySet[");
IndexItem item = container.getInternalList().getRoot();
while ((item = container.getInternalList().getNextEntry(item)) != null) {
- result += container.getKey(item);
- result += ",";
+ result.append(container.getKey(item));
+ result.append(",");
}
- result +="]";
- return result;
+ result.append("]");
+ return result.toString();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Wed Jun 27 11:53:30 2007
@@ -121,6 +121,10 @@
}
return result;
}
+
+ public int hashCode() {
+ return super.hashCode();
+ }
/*
* (non-Javadoc)
@@ -158,13 +162,14 @@
public synchronized Object removeFirst(){
load();
Object result=null;
- IndexItem item=(IndexItem)indexList.getFirst();
+ IndexItem item=indexList.getFirst();
if(item!=null){
itemRemoved(0);
result=getValue(item);
IndexItem prev=root;
IndexItem next=indexList.size()>1?(IndexItem)indexList.get(1):null;
indexList.removeFirst();
+
delete(item,prev,next);
item=null;
}
@@ -306,6 +311,7 @@
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
+
delete(item,prev,next);
}
@@ -591,7 +597,6 @@
*/
public synchronized ListIterator listIterator(){
load();
- IndexItem start= indexList.getFirst();
return new ContainerListIterator(this,indexList,indexList.getRoot());
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Wed Jun 27 11:53:30 2007
@@ -497,13 +497,12 @@
protected synchronized IndexItem write(Object key,Object value){
IndexItem index=null;
try{
- if(key!=null){
- index=indexManager.createNewIndex();
- StoreLocation data=dataManager.storeDataItem(keyMarshaller,key);
- index.setKeyData(data);
- }
+ index=indexManager.createNewIndex();
+ StoreLocation data=dataManager.storeDataItem(keyMarshaller,key);
+ index.setKeyData(data);
+
if(value!=null){
- StoreLocation data=dataManager.storeDataItem(valueMarshaller,value);
+ data=dataManager.storeDataItem(valueMarshaller,value);
index.setValueData(data);
}
IndexItem prev=indexList.getLast();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java Wed Jun 27 11:53:30 2007
@@ -38,7 +38,7 @@
DataFile(File file,int number){
this.file=file;
- this.number=new Integer(number);
+ this.number=Integer.valueOf(number);
length=file.exists()?file.length():0;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Wed Jun 27 11:53:30 2007
@@ -43,7 +43,7 @@
private static final Log log=LogFactory.getLog(DataManagerImpl.class);
public static final long MAX_FILE_LENGTH=1024*1024*32;
private static final String NAME_PREFIX="data-";
- private final File dir;
+ private final File directory;
private final String name;
private SyncDataFileReader reader;
private SyncDataFileWriter writer;
@@ -59,14 +59,14 @@
private String dataFilePrefix;
public DataManagerImpl(File dir, final String name){
- this.dir=dir;
+ this.directory=dir;
this.name=name;
dataFilePrefix = NAME_PREFIX+name+"-";
// build up list of current dataFiles
File[] files=dir.listFiles(new FilenameFilter(){
public boolean accept(File dir,String n){
- return dir.equals(dir)&&n.startsWith(dataFilePrefix);
+ return dir.equals(directory)&&n.startsWith(dataFilePrefix);
}
});
if(files!=null){
@@ -86,7 +86,7 @@
private DataFile createAndAddDataFile(int num){
String fileName=dataFilePrefix+num;
- File file=new File(dir,fileName);
+ File file=new File(directory,fileName);
DataFile result=new DataFile(file,num);
fileMap.put(result.getNumber(),result);
return result;
@@ -114,7 +114,7 @@
}
DataFile getDataFile(StoreLocation item) throws IOException{
- Integer key=new Integer(item.getFile());
+ Integer key=Integer.valueOf(item.getFile());
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile==null){
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java Wed Jun 27 11:53:30 2007
@@ -297,12 +297,8 @@
}
}
- private void doUnderFlow(int index) {
- int pageNo = index / maximumEntries;
- int nextPageNo = pageNo + 1;
- if (nextPageNo < hashPages.size()) {
- }
- HashPageInfo info = hashPages.get(pageNo);
+ private void doUnderFlow(@SuppressWarnings("unused") int index) {
+ //does little
}
private void end() throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashEntry.java Wed Jun 27 11:53:30 2007
@@ -43,7 +43,7 @@
return compareTo(o)==0;
}
- public int hasCode(){
+ public int hashCode(){
return key.hashCode();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java Wed Jun 27 11:53:30 2007
@@ -260,7 +260,7 @@
public synchronized void delete() throws IOException{
unload();
if(file.exists()){
- boolean result=file.delete();
+ file.delete();
}
length=0;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java Wed Jun 27 11:53:30 2007
@@ -225,9 +225,12 @@
void dump() {
- String str = this + ": ";
+ StringBuffer str = new StringBuffer(32);
+ str.append(toString());
+ str.append(": ");
for (HashEntry entry : hashIndexEntries) {
- str += entry + ",";
+ str .append(entry);
+ str.append(",");
}
log.info(str);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/tree/TreeEntry.java Wed Jun 27 11:53:30 2007
@@ -45,7 +45,7 @@
return compareTo(o)==0;
}
- public int hasCode(){
+ public int hashCode(){
return key.hashCode();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java Wed Jun 27 11:53:30 2007
@@ -50,7 +50,7 @@
}, "Cache Evictor: "+System.identityHashCode(this));
}
- private boolean evictMessages() {
+ boolean evictMessages() {
// Try to take the memory usage down below the low mark.
try {
log.debug("Evicting cache memory usage: "+usageManager.getPercentUsage());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Wed Jun 27 11:53:30 2007
@@ -125,7 +125,7 @@
/**
* @throws IOException
*/
- private void startBridge() throws IOException {
+ final void startBridge() throws IOException {
connectionInfo = new ConnectionInfo();
connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
connectionInfo.setClientId(clientId);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Wed Jun 27 11:53:30 2007
@@ -196,7 +196,7 @@
protected class Synchronization implements javax.transaction.Synchronization {
private final PooledSession session;
- private Synchronization(PooledSession session) {
+ protected Synchronization(PooledSession session) {
this.session = session;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Jun 27 11:53:30 2007
@@ -168,18 +168,22 @@
}
}
- public Response processAddDestination(DestinationInfo info) {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
- if( cs != null && info != null && info.getDestination().isTemporary() ) {
- cs.addTempDestination(info);
+ public Response processAddDestination(DestinationInfo info){
+ if(info!=null){
+ ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId());
+ if(cs!=null&&info.getDestination().isTemporary()){
+ cs.addTempDestination(info);
+ }
}
return TRACKED_RESPONSE_MARKER;
}
- public Response processRemoveDestination(DestinationInfo info) {
- ConnectionState cs = (ConnectionState) connectionStates.get(info.getConnectionId());
- if( cs != null && info != null && info.getDestination().isTemporary() ) {
- cs.removeTempDestination(info.getDestination());
+ public Response processRemoveDestination(DestinationInfo info){
+ if(info!=null){
+ ConnectionState cs=(ConnectionState)connectionStates.get(info.getConnectionId());
+ if(cs!=null&&info.getDestination().isTemporary()){
+ cs.removeTempDestination(info.getDestination());
+ }
}
return TRACKED_RESPONSE_MARKER;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Jun 27 11:53:30 2007
@@ -130,7 +130,7 @@
}
}
- private void addMessage(final Message message,final Location location) throws InterruptedIOException{
+ void addMessage(final Message message,final Location location) throws InterruptedIOException{
ReferenceData data=new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
@@ -205,7 +205,7 @@
}
}
- private void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
+ final void removeMessage(final MessageAck ack,final Location location) throws InterruptedIOException{
ReferenceData data;
synchronized(this){
lastLocation=location;
@@ -273,7 +273,7 @@
* @return
* @throws IOException
*/
- private void asyncWrite(){
+ void asyncWrite(){
try{
CountDownLatch countDown;
synchronized(this){
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Jun 27 11:53:30 2007
@@ -218,8 +218,10 @@
if(!started.compareAndSet(true,false))
return;
this.usageManager.removeUsageListener(this);
- Scheduler.cancel(periodicCheckpointTask);
- Scheduler.cancel(periodicCleanupTask);
+ synchronized(this){
+ Scheduler.cancel(periodicCheckpointTask);
+ Scheduler.cancel(periodicCleanupTask);
+ }
Iterator<AMQMessageStore> iterator=queues.values().iterator();
while(iterator.hasNext()){
AMQMessageStore ms=iterator.next();
@@ -232,7 +234,9 @@
}
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
- checkpointTask.shutdown();
+ synchronized(this){
+ checkpointTask.shutdown();
+ }
queues.clear();
topics.clear();
IOException firstException=null;
@@ -259,8 +263,8 @@
CountDownLatch latch=null;
synchronized(this){
latch=nextCheckpointCountDownLatch;
+ checkpointTask.wakeup();
}
- checkpointTask.wakeup();
if(sync){
if(log.isDebugEnabled()){
log.debug("Waitng for checkpoint to complete.");
@@ -585,7 +589,7 @@
return transactionStore;
}
- public void deleteAllMessages() throws IOException{
+ public synchronized void deleteAllMessages() throws IOException{
deleteAllMessages=true;
}
@@ -669,11 +673,11 @@
this.maxCheckpointWorkers=maxCheckpointWorkers;
}
- public File getDirectory(){
+ public synchronized File getDirectory(){
return directory;
}
- public void setDirectory(File directory){
+ public synchronized void setDirectory(File directory){
this.directory=directory;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Jun 27 11:53:30 2007
@@ -143,7 +143,7 @@
* @param key
* @throws InterruptedIOException
*/
- private void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
+ protected void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
synchronized(this){
lastLocation=location;
ackedLastAckLocations.put(key,messageId);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Wed Jun 27 11:53:30 2007
@@ -125,7 +125,7 @@
}
}
- private void addMessage(final Message message, final RecordLocation location) {
+ void addMessage(final Message message, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = message.getMessageId();
@@ -187,7 +187,7 @@
}
}
- private void removeMessage(final MessageAck ack, final RecordLocation location) {
+ final void removeMessage(final MessageAck ack, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
@@ -253,33 +253,31 @@
ConnectionContext context = transactionTemplate.getContext();
// Checkpoint the added messages.
- Iterator iterator = cpAddedMessageIds.values().iterator();
- while (iterator.hasNext()) {
- Message message = (Message) iterator.next();
- try {
- longTermStore.addMessage(context, message);
- } catch (Throwable e) {
- log.warn("Message could not be added to long term store: " + e.getMessage(), e);
+ synchronized(JournalMessageStore.this){
+ Iterator iterator=cpAddedMessageIds.values().iterator();
+ while(iterator.hasNext()){
+ Message message=(Message)iterator.next();
+ try{
+ longTermStore.addMessage(context,message);
+ }catch(Throwable e){
+ log.warn("Message could not be added to long term store: "+e.getMessage(),e);
+ }
+ size+=message.getSize();
+ message.decrementReferenceCount();
+ // Commit the batch if it's getting too big
+ if(size>=maxCheckpointMessageAddSize){
+ persitanceAdapter.commitTransaction(context);
+ persitanceAdapter.beginTransaction(context);
+ size=0;
+ }
}
-
- size += message.getSize();
-
- message.decrementReferenceCount();
-
- // Commit the batch if it's getting too big
- if( size >= maxCheckpointMessageAddSize ) {
- persitanceAdapter.commitTransaction(context);
- persitanceAdapter.beginTransaction(context);
- size=0;
- }
-
}
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
- iterator = cpRemovedMessageLocations.iterator();
+ Iterator iterator = cpRemovedMessageLocations.iterator();
while (iterator.hasNext()) {
try {
MessageAck ack = (MessageAck) iterator.next();
@@ -303,7 +301,8 @@
if( cpActiveJournalLocations.size() > 0 ) {
Collections.sort(cpActiveJournalLocations);
return (RecordLocation) cpActiveJournalLocations.get(0);
- } else {
+ }
+ synchronized (this){
return lastLocation;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Jun 27 11:53:30 2007
@@ -92,8 +92,8 @@
private final ConcurrentHashMap topics = new ConcurrentHashMap();
private UsageManager usageManager;
- private long checkpointInterval = 1000 * 60 * 5;
- private long lastCheckpointRequest = System.currentTimeMillis();
+ long checkpointInterval = 1000 * 60 * 5;
+ long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
private int maxCheckpointWorkers = 10;
private int maxCheckpointMessageAddSize = 1024*1024;
@@ -112,7 +112,11 @@
final Runnable createPeriodicCheckpointTask() {
return new Runnable() {
public void run() {
- if( System.currentTimeMillis()>lastCheckpointRequest+checkpointInterval ) {
+ long lastTime = 0;
+ synchronized(this) {
+ lastTime = lastCheckpointRequest;
+ }
+ if( System.currentTimeMillis()>lastTime+checkpointInterval ) {
checkpoint(false, true);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Wed Jun 27 11:53:30 2007
@@ -142,7 +142,7 @@
* @param location
* @param key
*/
- private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
+ protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
synchronized(this) {
lastLocation = location;
ackedLastAckLocations.put(key, messageId);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Jun 27 11:53:30 2007
@@ -174,7 +174,7 @@
* @param nextToDispatch
* @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
*/
- public void resetBatching(){
+ public synchronized void resetBatching(){
batchEntry=null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Jun 27 11:53:30 2007
@@ -149,7 +149,7 @@
messageContainer.clear();
}
- public void resetBatching(){
+ public synchronized void resetBatching(){
batchEntry=null;
lastBatchId=null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Jun 27 11:53:30 2007
@@ -212,7 +212,7 @@
}
@Override
- public void setDirectory(File directory){
+ public synchronized void setDirectory(File directory){
File file = new File(directory,"data");
super.setDirectory(file);
this.stateStore=createStateStore(directory);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Wed Jun 27 11:53:30 2007
@@ -138,7 +138,7 @@
* @param ack
* @throws IOException
*/
- private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
+ final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException{
if(ack.isInTransaction()){
KahaTransaction tx=getOrCreateTx(ack.getTransactionId());
tx.add((KahaMessageStore) destination,ack);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Wed Jun 27 11:53:30 2007
@@ -27,8 +27,7 @@
public class TopicSubContainer {
private transient ListContainer listContainer;
private transient StoreEntry batchEntry;
- private transient String lastBatchId;
-
+
public TopicSubContainer(ListContainer container) {
this.listContainer = container;
}
@@ -45,12 +44,10 @@
* @param batchEntry the batchEntry to set
*/
public void setBatchEntry(String id,StoreEntry batchEntry) {
- this.lastBatchId=id;
this.batchEntry = batchEntry;
}
public void reset() {
- lastBatchId=null;
batchEntry = null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Wed Jun 27 11:53:30 2007
@@ -229,7 +229,7 @@
* @param ack
* @throws IOException
*/
- private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
+ final void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
if( doingRecover )
return;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java Wed Jun 27 11:53:30 2007
@@ -82,7 +82,7 @@
shutdown(0);
}
- private void runTask() {
+ final void runTask() {
try {
while( true ) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java Wed Jun 27 11:53:30 2007
@@ -96,7 +96,7 @@
public void shutdown() throws InterruptedException {
shutdown(0);
}
- private void runTask() {
+ final void runTask() {
synchronized (runable) {
queued = false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java Wed Jun 27 11:53:30 2007
@@ -68,7 +68,7 @@
}
- private void writeCheck() {
+ final void writeCheck() {
synchronized(writeChecker) {
if( inSend.get() ) {
log.trace("A send is in progress");
@@ -90,7 +90,7 @@
}
}
- private void readCheck() {
+ final void readCheck() {
synchronized(readChecker) {
if( inReceive.get() ) {
log.trace("A receive is in progress");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java Wed Jun 27 11:53:30 2007
@@ -60,8 +60,6 @@
private String group = "default";
private final CopyOnWriteArrayList serviceInfos = new CopyOnWriteArrayList();
- private String brokerName;
-
// DiscoveryAgent interface
//-------------------------------------------------------------------------
public void start() throws Exception {
@@ -232,11 +230,16 @@
return "_" + group+"."+TYPE_SUFFIX;
}
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
- }
-
public void serviceFailed(DiscoveryEvent event) throws IOException {
// TODO: is there a way to notify the JmDNS that the service failed?
+ }
+
+ /**
+ * @param brokerName
+ * @see org.apache.activemq.transport.discovery.DiscoveryAgent#setBrokerName(java.lang.String)
+ */
+ public void setBrokerName(String brokerName){
+ // implementation of interface
+
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jun 27 11:53:30 2007
@@ -91,7 +91,7 @@
return;
}
if (command.isResponse()) {
- Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+ Object object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
if( object!=null && object.getClass() == Tracked.class ) {
((Tracked)object).onResponses();
}
@@ -231,7 +231,7 @@
}, "ActiveMQ Failover Worker: "+System.identityHashCode(this));
}
- private void handleTransportFailure(IOException e) throws InterruptedException {
+ final void handleTransportFailure(IOException e) throws InterruptedException {
if (transportListener != null){
transportListener.transportInterupted();
}
@@ -382,9 +382,9 @@
// it later.
Tracked tracked = stateTracker.track(command);
if( tracked!=null && tracked.isWaitingForResponse() ) {
- requestMap.put(new Integer(command.getCommandId()), tracked);
+ requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
} else if ( tracked==null && command.isResponseRequired()) {
- requestMap.put(new Integer(command.getCommandId()), command);
+ requestMap.put(Integer.valueOf(command.getCommandId()), command);
}
// Send the message.
@@ -398,7 +398,7 @@
// since we will retry in this method.. take it out of the request
// map so that it is not sent 2 times on recovery
if( command.isResponseRequired() ) {
- requestMap.remove(new Integer(command.getCommandId()));
+ requestMap.remove(Integer.valueOf(command.getCommandId()));
}
// Rethrow the exception so it will handled by the outer catch
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java?view=diff&rev=551271&r1=551270&r2=551271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/MessageComparatorSupport.java Wed Jun 27 11:53:30 2007
@@ -19,6 +19,7 @@
import javax.jms.Message;
+import java.io.Serializable;
import java.util.Comparator;
/**
@@ -26,7 +27,7 @@
*
* @version $Revision$
*/
-public abstract class MessageComparatorSupport implements Comparator {
+public abstract class MessageComparatorSupport implements Comparator, Serializable {
public int compare(Object object1, Object object2) {
Message command1 = (Message) object1;
@@ -36,11 +37,20 @@
protected abstract int compareMessages(Message message1, Message message2);
- protected int compareComparators(Comparable comparable, Comparable comparable2) {
- if (comparable != null) {
+ protected int compareComparators(final Comparable comparable, final Comparable comparable2) {
+ if (comparable == null && comparable2 == null) {
+ return 0;
+ }
+ else if (comparable != null) {
+ if (comparable2== null) {
+ return 1;
+ }
return comparable.compareTo(comparable2);
}
else if (comparable2 != null) {
+ if (comparable== null) {
+ return -11;
+ }
return comparable2.compareTo(comparable) * -1;
}
return 0;