You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/09 18:40:08 UTC
svn commit: r564271 [8/18] - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/
activemq-core/src/main/java/org/apache/activemq/advisory/
activemq-core/src/main/java/org/apache/activemq/blob/
activemq-core/src/main/java/org/apache/act...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/spring/ActiveMQXAConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/spring/ActiveMQXAConnectionFactory.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/spring/ActiveMQXAConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/spring/ActiveMQXAConnectionFactory.java Thu Aug 9 09:37:49 2007
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Aug 9 09:37:49 2007
@@ -45,7 +45,7 @@
*/
public class ConnectionStateTracker extends CommandVisitorAdapter {
- private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+ private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
private boolean trackTransactions = false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Thu Aug 9 09:37:49 2007
@@ -1,20 +1,23 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.activemq.store;
import java.io.IOException;
+
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -37,7 +40,7 @@
* @param message
* @throws IOException
*/
- public void addMessage(ConnectionContext context, Message message) throws IOException;
+ void addMessage(ConnectionContext context, Message message) throws IOException;
/**
* Looks up a message using either the String messageID or the
@@ -48,7 +51,7 @@
* @return the message or null if it does not exist
* @throws IOException
*/
- public Message getMessage(MessageId identity) throws IOException;
+ Message getMessage(MessageId identity) throws IOException;
/**
* Removes a message from the message store.
@@ -59,7 +62,7 @@
* message that needs to be removed.
* @throws IOException
*/
- public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
+ void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
/**
* Removes all the messages from the message store.
@@ -67,7 +70,7 @@
* @param context
* @throws IOException
*/
- public void removeAllMessages(ConnectionContext context) throws IOException;
+ void removeAllMessages(ConnectionContext context) throws IOException;
/**
* Recover any messages to be delivered.
@@ -75,34 +78,34 @@
* @param container
* @throws Exception
*/
- public void recover(MessageRecoveryListener container) throws Exception;
+ void recover(MessageRecoveryListener container) throws Exception;
/**
* The destination that the message store is holding messages for.
*
* @return the destination
*/
- public ActiveMQDestination getDestination();
+ ActiveMQDestination getDestination();
/**
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
- public void setUsageManager(UsageManager usageManager);
+ void setUsageManager(UsageManager usageManager);
/**
* @return the number of messages ready to deliver
* @throws IOException
*
*/
- public int getMessageCount() throws IOException;
+ int getMessageCount() throws IOException;
/**
* A hint to the Store to reset any batching state for the Destination
*
*/
- public void resetBatching();
+ void resetBatching();
- public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
+ void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Thu Aug 9 09:37:49 2007
@@ -40,7 +40,7 @@
*
* @return active destinations
*/
- public Set<ActiveMQDestination> getDestinations();
+ Set<ActiveMQDestination> getDestinations();
/**
* Factory method to create a new queue message store with the given destination name
@@ -48,7 +48,7 @@
* @return the message store
* @throws IOException
*/
- public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
+ MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
/**
* Factory method to create a new topic message store with the given destination name
@@ -56,14 +56,14 @@
* @return the topic message store
* @throws IOException
*/
- public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
+ TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
/**
* Factory method to create a new persistent prepared transaction store for XA recovery
* @return transaction store
* @throws IOException
*/
- public TransactionStore createTransactionStore() throws IOException;
+ TransactionStore createTransactionStore() throws IOException;
/**
* This method starts a transaction on the persistent storage - which is nothing to
@@ -77,7 +77,7 @@
* @param context
* @throws IOException
*/
- public void beginTransaction(ConnectionContext context) throws IOException;
+ void beginTransaction(ConnectionContext context) throws IOException;
/**
@@ -87,7 +87,7 @@
*
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
- public void commitTransaction(ConnectionContext context) throws IOException;
+ void commitTransaction(ConnectionContext context) throws IOException;
/**
* Rollback a persistence transaction
@@ -96,38 +96,38 @@
*
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
- public void rollbackTransaction(ConnectionContext context) throws IOException;
+ void rollbackTransaction(ConnectionContext context) throws IOException;
/**
*
* @return last broker sequence
* @throws IOException
*/
- public long getLastMessageBrokerSequenceId() throws IOException;
+ long getLastMessageBrokerSequenceId() throws IOException;
/**
* Delete's all the messages in the persistent store.
*
* @throws IOException
*/
- public void deleteAllMessages() throws IOException;
+ void deleteAllMessages() throws IOException;
/**
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
- public void setUsageManager(UsageManager usageManager);
+ void setUsageManager(UsageManager usageManager);
/**
* Set the name of the broker using the adapter
* @param brokerName
*/
- public void setBrokerName(String brokerName);
+ void setBrokerName(String brokerName);
/**
* Set the directory where any data files should be created
* @param dir
*/
- public void setDirectory(File dir);
+ void setDirectory(File dir);
/**
* checkpoint any
@@ -135,5 +135,5 @@
* @throws IOException
*
*/
- public void checkpoint(boolean sync) throws IOException;
+ void checkpoint(boolean sync) throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactory.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapterFactory.java Thu Aug 9 09:37:49 2007
@@ -29,6 +29,6 @@
* Creates a persistence Adapter that can use a given directory to store it's data.
* @throws IOException
*/
- public PersistenceAdapter createPersistenceAdapter() throws IOException;
+ PersistenceAdapter createPersistenceAdapter() throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Thu Aug 9 09:37:49 2007
@@ -67,21 +67,20 @@
/**
* Adds a message reference to the message store
*/
- public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data)
- throws IOException;
+ void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
/**
* Looks up a message using either the String messageID or the
* messageNumber. Implementations are encouraged to fill in the missing key
* if its easy to do so.
*/
- public ReferenceData getMessageReference(MessageId identity) throws IOException;
+ ReferenceData getMessageReference(MessageId identity) throws IOException;
/**
* @return true if it supports external batch control
*/
- public boolean supportsExternalBatchControl();
+ boolean supportsExternalBatchControl();
- public void setBatch(MessageId startAfter);
+ void setBatch(MessageId startAfter);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStoreAdapter.java Thu Aug 9 09:37:49 2007
@@ -40,7 +40,7 @@
* @return the QueueReferenceStore
* @throws IOException
*/
- public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException;
+ ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException;
/**
* Factory method to create a new topic message store with the given
@@ -50,34 +50,34 @@
* @return the TopicRefererenceStore
* @throws IOException
*/
- public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException;
+ TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException;
/**
* @return Set of File ids in use
* @throws IOException
*/
- public Set<Integer> getReferenceFileIdsInUse() throws IOException;
+ Set<Integer> getReferenceFileIdsInUse() throws IOException;
/**
* If the store isn't valid, it can be recoverd at start-up
*
* @return true if the reference store is in a consistent state
*/
- public boolean isStoreValid();
+ boolean isStoreValid();
/**
* called by recover to clear out message references
*
* @throws IOException
*/
- public void clearMessages() throws IOException;
+ void clearMessages() throws IOException;
/**
* recover any state
*
* @throws IOException
*/
- public void recoverState() throws IOException;
+ void recoverState() throws IOException;
/**
* Save prepared transactions
@@ -85,12 +85,12 @@
* @param map
* @throws IOException
*/
- public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException;
+ void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException;
/**
* @return saved prepared transactions
* @throws IOException
*/
- public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException;
+ Map<TransactionId, AMQTx> retrievePreparedState() throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Thu Aug 9 09:37:49 2007
@@ -1,15 +1,18 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.activemq.store;
@@ -37,8 +40,7 @@
* @param subscriptionPersistentId
* @throws IOException
*/
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId) throws IOException;
+ void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
/**
* @param clientId
@@ -47,7 +49,7 @@
* @throws IOException
* @throws JMSException
*/
- public void deleteSubscription(String clientId, String subscriptionName) throws IOException;
+ void deleteSubscription(String clientId, String subscriptionName) throws IOException;
/**
* For the new subscription find the last acknowledged message ID and then
@@ -60,11 +62,9 @@
* @param subscriptionName
* @param listener
* @param subscription
- *
* @throws Exception
*/
- public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
- throws Exception;
+ void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
/**
* For an active subscription - retrieve messages from the store for the
@@ -74,20 +74,17 @@
* @param subscriptionName
* @param maxReturned
* @param listener
- *
* @throws Exception
*/
- public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
- MessageRecoveryListener listener) throws Exception;
+ void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
/**
* A hint to the Store to reset any batching state for a durable subsriber
*
* @param clientId
* @param subscriptionName
- *
*/
- public void resetBatching(String clientId, String subscriptionName);
+ void resetBatching(String clientId, String subscriptionName);
/**
* Get the number of messages ready to deliver from the store to a durable
@@ -98,7 +95,7 @@
* @return the outstanding message count
* @throws IOException
*/
- public int getMessageCount(String clientId, String subscriberName) throws IOException;
+ int getMessageCount(String clientId, String subscriberName) throws IOException;
/**
* Finds the subscriber entry for the given consumer info
@@ -108,7 +105,7 @@
* @return the SubscriptionInfo
* @throws IOException
*/
- public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
+ SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
/**
* Lists all the durable subscriptions for a given destination.
@@ -116,7 +113,7 @@
* @return an array SubscriptionInfos
* @throws IOException
*/
- public SubscriptionInfo[] getAllSubscriptions() throws IOException;
+ SubscriptionInfo[] getAllSubscriptions() throws IOException;
/**
* Inserts the subscriber info due to a subscription change <p/> If this is
@@ -131,7 +128,6 @@
* @param selector
* @param retroactive
* @throws IOException
- *
*/
- public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
+ void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Thu Aug 9 09:37:49 2007
@@ -41,8 +41,7 @@
* @param subscriptionPersistentId
* @throws IOException
*/
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId) throws IOException;
+ void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
/**
* @param clientId
@@ -51,7 +50,7 @@
* @throws IOException
* @throws JMSException
*/
- public void deleteSubscription(String clientId, String subscriptionName) throws IOException;
+ void deleteSubscription(String clientId, String subscriptionName) throws IOException;
/**
* For the new subscription find the last acknowledged message ID and then
@@ -64,11 +63,9 @@
* @param subscriptionName
* @param listener
* @param subscription
- *
* @throws Exception
*/
- public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
- throws Exception;
+ void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception;
/**
* For an active subscription - retrieve messages from the store for the
@@ -78,20 +75,17 @@
* @param subscriptionName
* @param maxReturned
* @param listener
- *
* @throws Exception
*/
- public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
- MessageRecoveryListener listener) throws Exception;
+ void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
/**
* A hint to the Store to reset any batching state for a durable subsriber
*
* @param clientId
* @param subscriptionName
- *
*/
- public void resetBatching(String clientId, String subscriptionName);
+ void resetBatching(String clientId, String subscriptionName);
/**
* Get the number of messages ready to deliver from the store to a durable
@@ -102,7 +96,7 @@
* @return the outstanding message count
* @throws IOException
*/
- public int getMessageCount(String clientId, String subscriberName) throws IOException;
+ int getMessageCount(String clientId, String subscriberName) throws IOException;
/**
* Finds the subscriber entry for the given consumer info
@@ -112,7 +106,7 @@
* @return the SubscriptionInfo
* @throws IOException
*/
- public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
+ SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException;
/**
* Lists all the durable subscirptions for a given destination.
@@ -120,7 +114,7 @@
* @return an array SubscriptionInfos
* @throws IOException
*/
- public SubscriptionInfo[] getAllSubscriptions() throws IOException;
+ SubscriptionInfo[] getAllSubscriptions() throws IOException;
/**
* Inserts the subscriber info due to a subscription change <p/> If this is
@@ -135,7 +129,6 @@
* @param selector
* @param retroactive
* @throws IOException
- *
*/
- public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
+ void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionRecoveryListener.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionRecoveryListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionRecoveryListener.java Thu Aug 9 09:37:49 2007
@@ -21,5 +21,5 @@
import org.apache.activemq.command.XATransactionId;
public interface TransactionRecoveryListener {
- public void recover(XATransactionId xid, Message[] addedMessages, MessageAck aks[]);
+ void recover(XATransactionId xid, Message[] addedMessages, MessageAck aks[]);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java Thu Aug 9 09:37:49 2007
@@ -21,18 +21,20 @@
import org.apache.activemq.Service;
import org.apache.activemq.command.TransactionId;
-
/**
- * Represents the durable store of the commit/rollback operations taken against the
- * broker.
- *
+ * Represents the durable store of the commit/rollback operations taken against
+ * the broker.
+ *
* @version $Revision: 1.2 $
*/
public interface TransactionStore extends Service {
- public void prepare(TransactionId txid) throws IOException;
- public void commit(TransactionId txid, boolean wasPrepared) throws IOException;
- public void rollback(TransactionId txid) throws IOException;
- public void recover(TransactionRecoveryListener listener) throws IOException;
-
+ void prepare(TransactionId txid) throws IOException;
+
+ void commit(TransactionId txid, boolean wasPrepared) throws IOException;
+
+ void rollback(TransactionId txid) throws IOException;
+
+ void recover(TransactionRecoveryListener listener) throws IOException;
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Thu Aug 9 09:37:49 2007
@@ -1,17 +1,19 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.activemq.store.amq;
import java.io.IOException;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Thu Aug 9 09:37:49 2007
@@ -1,15 +1,18 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.activemq.store.amq;
@@ -70,7 +73,7 @@
*/
public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
- private static final Log log = LogFactory.getLog(AMQPersistenceAdapter.class);
+ private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
private AsyncDataManager asyncDataManager;
@@ -125,7 +128,7 @@
this.directory = new File(directory, "amqstore");
}
}
- log.info("AMQStore starting using directory: " + directory);
+ LOG.info("AMQStore starting using directory: " + directory);
this.directory.mkdirs();
if (this.usageManager != null) {
@@ -151,7 +154,7 @@
trace.setMessage("DELETED " + new Date());
Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
asyncDataManager.setMark(location, true);
- log.info("Journal deleted: ");
+ LOG.info("Journal deleted: ");
deleteAllMessages = false;
} catch (IOException e) {
throw e;
@@ -162,7 +165,7 @@
}
referenceStoreAdapter.start();
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
- log.info("Active data files: " + files);
+ LOG.info("Active data files: " + files);
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
public boolean iterate() {
@@ -182,9 +185,9 @@
// need to be recovered when the broker starts up.
if (!referenceStoreAdapter.isStoreValid()) {
- log.warn("The ReferenceStore is not valid - recovering ...");
+ LOG.warn("The ReferenceStore is not valid - recovering ...");
recover();
- log.info("Finished recovering the ReferenceStore");
+ LOG.info("Finished recovering the ReferenceStore");
} else {
Location location = writeTraceMessage("RECOVERED " + new Date(), true);
asyncDataManager.setMark(location, true);
@@ -240,7 +243,7 @@
IOException firstException = null;
referenceStoreAdapter.stop();
try {
- log.debug("Journal close");
+ LOG.debug("Journal close");
asyncDataManager.close();
} catch (Exception e) {
firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
@@ -266,17 +269,17 @@
checkpointTask.wakeup();
}
if (sync) {
- if (log.isDebugEnabled()) {
- log.debug("Waitng for checkpoint to complete.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waitng for checkpoint to complete.");
}
latch.await();
}
referenceStoreAdapter.checkpoint(sync);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- log.warn("Request to start checkpoint failed: " + e, e);
+ LOG.warn("Request to start checkpoint failed: " + e, e);
} catch (IOException e) {
- log.error("checkpoint failed: " + e, e);
+ LOG.error("checkpoint failed: " + e, e);
}
}
@@ -292,8 +295,8 @@
nextCheckpointCountDownLatch = new CountDownLatch(1);
}
try {
- if (log.isDebugEnabled()) {
- log.debug("Checkpoint started.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checkpoint started.");
}
Location newMark = null;
@@ -315,17 +318,17 @@
}
try {
if (newMark != null) {
- if (log.isDebugEnabled()) {
- log.debug("Marking journal at: " + newMark);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Marking journal at: " + newMark);
}
asyncDataManager.setMark(newMark, false);
writeTraceMessage("CHECKPOINT " + new Date(), true);
}
} catch (Exception e) {
- log.error("Failed to mark the Journal: " + e, e);
+ LOG.error("Failed to mark the Journal: " + e, e);
}
- if (log.isDebugEnabled()) {
- log.debug("Checkpoint done.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checkpoint done.");
}
} finally {
latch.countDown();
@@ -344,7 +347,7 @@
Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse);
} catch (IOException e) {
- log.error("Could not cleanup data files: " + e, e);
+ LOG.error("Could not cleanup data files: " + e, e);
}
}
@@ -441,7 +444,7 @@
referenceStoreAdapter.recoverState();
Location pos = null;
int redoCounter = 0;
- log.info("Journal Recovery Started from: " + asyncDataManager);
+ LOG.info("Journal Recovery Started from: " + asyncDataManager);
long start = System.currentTimeMillis();
ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
@@ -517,23 +520,23 @@
throw new IOException("Invalid journal command type: " + command.getType());
}
} catch (IOException e) {
- log.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
+ LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
}
}
break;
case JournalTrace.DATA_STRUCTURE_TYPE:
JournalTrace trace = (JournalTrace)c;
- log.debug("TRACE Entry: " + trace.getMessage());
+ LOG.debug("TRACE Entry: " + trace.getMessage());
break;
default:
- log.error("Unknown type of record in transaction log which will be discarded: " + c);
+ LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
}
}
}
Location location = writeTraceMessage("RECOVERED " + new Date(), true);
asyncDataManager.setMark(location, true);
long end = System.currentTimeMillis();
- log.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
+ LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
}
private IOException createReadException(Location location, Exception e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java Thu Aug 9 09:37:49 2007
@@ -1,17 +1,19 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.activemq.store.amq;
import java.io.File;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Thu Aug 9 09:37:49 2007
@@ -1,17 +1,19 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.activemq.store.amq;
import java.io.IOException;
@@ -41,7 +43,7 @@
*/
public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
- private static final Log log = LogFactory.getLog(AMQTopicMessageStore.class);
+ private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore;
private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
@@ -75,7 +77,7 @@
/**
*/
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
- final boolean debug = log.isDebugEnabled();
+ final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
@@ -87,12 +89,12 @@
final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
if (!context.isInTransaction()) {
if (debug) {
- log.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
+ LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
}
acknowledge(messageId, location, key);
} else {
if (debug) {
- log.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
+ LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
}
synchronized (this) {
inFlightTxLocations.add(location);
@@ -102,7 +104,7 @@
public void afterCommit() throws Exception {
if (debug) {
- log.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
+ LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
}
synchronized (AMQTopicMessageStore.this) {
inFlightTxLocations.remove(location);
@@ -112,7 +114,7 @@
public void afterRollback() throws Exception {
if (debug) {
- log.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
+ LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
}
synchronized (AMQTopicMessageStore.this) {
inFlightTxLocations.remove(location);
@@ -130,7 +132,7 @@
return true;
}
} catch (Throwable e) {
- log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
+ LOG.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
}
return false;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Thu Aug 9 09:37:49 2007
@@ -178,7 +178,7 @@
public void stop() throws Exception {
}
- synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
+ public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
// All the in-flight transactions get rolled back..
synchronized (inflightTransactions) {
inflightTransactions.clear();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Thu Aug 9 09:37:49 2007
@@ -1,17 +1,19 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.activemq.store.amq;
import org.apache.activemq.command.Message;
@@ -23,7 +25,7 @@
final class RecoveryListenerAdapter implements MessageRecoveryListener {
- static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class);
+ private static final Log LOG = LogFactory.getLog(RecoveryListenerAdapter.class);
private final MessageStore store;
private final MessageRecoveryListener listener;
private int count = 0;
@@ -53,7 +55,7 @@
if (message != null) {
return recoverMessage(message);
} else {
- log.error("Message id " + ref + " could not be recovered from the data store!");
+ LOG.error("Message id " + ref + " could not be recovered from the data store!");
}
return false;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java Thu Aug 9 09:37:49 2007
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -29,6 +29,6 @@
* Used by a timer to keep alive the lock.
* If the method returns false the broker should be terminated
*/
- public boolean keepAlive();
+ boolean keepAlive();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java Thu Aug 9 09:37:49 2007
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -32,7 +32,7 @@
* @version $Revision: $
*/
public class DefaultDatabaseLocker implements DatabaseLocker {
- private static final Log log = LogFactory.getLog(DefaultDatabaseLocker.class);
+ private static final Log LOG = LogFactory.getLog(DefaultDatabaseLocker.class);
private final DataSource dataSource;
private final Statements statements;
private long sleepTime = 1000;
@@ -48,7 +48,7 @@
public void start() throws Exception {
stopping = false;
- log.info("Attempting to acquire the exclusive lock to become the Master broker");
+ LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
while (true) {
try {
@@ -62,12 +62,12 @@
if (stopping) {
throw new Exception("Cannot start broker as being asked to shut down. Interrupted attempt to acquire lock: " + e, e);
}
- log.error("Failed to acquire lock: " + e, e);
+ LOG.error("Failed to acquire lock: " + e, e);
if (null != statement) {
try {
statement.close();
} catch (SQLException e1) {
- log.warn("Caught while closing statement: " + e1, e1);
+ LOG.warn("Caught while closing statement: " + e1, e1);
}
statement = null;
}
@@ -75,17 +75,17 @@
try {
connection.close();
} catch (SQLException e1) {
- log.warn("Caught while closing connection: " + e1, e1);
+ LOG.warn("Caught while closing connection: " + e1, e1);
}
connection = null;
}
}
- log.debug("Sleeping for " + sleepTime + " milli(s) before trying again to get the lock...");
+ LOG.debug("Sleeping for " + sleepTime + " milli(s) before trying again to get the lock...");
Thread.sleep(sleepTime);
}
- log.info("Becoming the master on dataSource: " + dataSource);
+ LOG.info("Becoming the master on dataSource: " + dataSource);
}
public void stop() throws Exception {
@@ -105,7 +105,7 @@
return true;
}
} catch (Exception e) {
- log.error("Failed to update database lock: " + e, e);
+ LOG.error("Failed to update database lock: " + e, e);
}
return false;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Thu Aug 9 09:37:49 2007
@@ -1,17 +1,19 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.activemq.store.jdbc;
import java.io.IOException;
@@ -26,85 +28,57 @@
*/
public interface JDBCAdapter {
- public void setStatements(Statements statementProvider);
-
- public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException;
+ void setStatements(Statements statementProvider);
- public abstract void doDropTables(TransactionContext c) throws SQLException, IOException;
+ void doCreateTables(TransactionContext c) throws SQLException, IOException;
- public abstract void doAddMessage(TransactionContext c, MessageId messageID,
- ActiveMQDestination destination, byte[] data, long expiration)
- throws SQLException, IOException;
+ void doDropTables(TransactionContext c) throws SQLException, IOException;
- public abstract void doAddMessageReference(TransactionContext c, MessageId messageId,
- ActiveMQDestination destination, long expirationTime,
- String messageRef) throws SQLException, IOException;
+ void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException;
- public abstract byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
+ void doAddMessageReference(TransactionContext c, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
- public abstract String doGetMessageReference(TransactionContext c, long id) throws SQLException,
- IOException;
+ byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
- public abstract void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
+ String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
- public abstract void doRecover(TransactionContext c, ActiveMQDestination destination,
- JDBCMessageRecoveryListener listener) throws Exception;
+ void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
- public abstract void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
- String subscriptionName, long seq) throws SQLException, IOException;
+ void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) throws Exception;
- public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination,
- String clientId, String subscriptionName,
- JDBCMessageRecoveryListener listener) throws Exception;
+ void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException, IOException;
- public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination,
- String clientId, String subscriptionName, long seq,
- int maxReturned, JDBCMessageRecoveryListener listener)
+ void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener)
throws Exception;
- public abstract void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo,
- boolean retroactive) throws SQLException, IOException;
+ void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, int maxReturned,
+ JDBCMessageRecoveryListener listener) throws Exception;
+
+ void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo, boolean retroactive) throws SQLException, IOException;
- public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,
- ActiveMQDestination destination, String clientId,
- String subscriptionName) throws SQLException,
- IOException;
+ SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
- public abstract long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
- IOException;
+ long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException;
- public abstract void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName)
- throws SQLException, IOException;
+ void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException;
- public abstract void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName,
- String clientId, String subscriptionName) throws SQLException,
- IOException;
+ void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName) throws SQLException, IOException;
- public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
+ void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
- public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,
- IOException;
+ long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
- public abstract Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
+ Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
- public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
+ void setUseExternalMessageReferences(boolean useExternalMessageReferences);
- public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,
- ActiveMQDestination destination)
- throws SQLException, IOException;
+ SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
- public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
- String clientId, String subscriptionName)
- throws SQLException, IOException;
+ int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
- public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
- IOException;
+ int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
- public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
- int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
+ void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
- public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
- ActiveMQDestination destination, String clientId,
- String subscriberName) throws SQLException,
- IOException;
+ long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Thu Aug 9 09:37:49 2007
@@ -62,7 +62,7 @@
public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
BrokerServiceAware {
- private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
+ private static final Log LOG = LogFactory.getLog(JDBCPersistenceAdapter.class);
private static FactoryFinder factoryFinder = new FactoryFinder(
"META-INF/services/org/apache/activemq/store/jdbc/");
@@ -155,7 +155,7 @@
try {
getAdapter().doCreateTables(transactionContext);
} catch (SQLException e) {
- log.warn("Cannot create tables due to: " + e);
+ LOG.warn("Cannot create tables due to: " + e);
JDBCPersistenceAdapter.log("Failure Details: ", e);
}
} finally {
@@ -166,7 +166,7 @@
if (isUseDatabaseLock()) {
DatabaseLocker service = getDatabaseLocker();
if (service == null) {
- log.warn("No databaseLocker configured for the JDBC Persistence Adapter");
+ LOG.warn("No databaseLocker configured for the JDBC Persistence Adapter");
} else {
service.start();
}
@@ -202,13 +202,13 @@
public void cleanup() {
TransactionContext c = null;
try {
- log.debug("Cleaning up old messages.");
+ LOG.debug("Cleaning up old messages.");
c = getTransactionContext();
getAdapter().doDeleteOldMessages(c);
} catch (IOException e) {
- log.warn("Old message cleanup failed due to: " + e, e);
+ LOG.warn("Old message cleanup failed due to: " + e, e);
} catch (SQLException e) {
- log.warn("Old message cleanup failed due to: " + e);
+ LOG.warn("Old message cleanup failed due to: " + e);
JDBCPersistenceAdapter.log("Failure Details: ", e);
} finally {
if (c != null) {
@@ -217,7 +217,7 @@
} catch (Throwable e) {
}
}
- log.debug("Cleanup done.");
+ LOG.debug("Cleanup done.");
}
}
@@ -290,14 +290,14 @@
try {
adapter = (DefaultJDBCAdapter)factoryFinder.newInstance(dirverName);
- log.info("Database driver recognized: [" + dirverName + "]");
+ LOG.info("Database driver recognized: [" + dirverName + "]");
} catch (Throwable e) {
- log.warn("Database driver NOT recognized: [" + dirverName
+ LOG.warn("Database driver NOT recognized: [" + dirverName
+ "]. Will use default JDBC implementation.");
}
} catch (SQLException e) {
- log
+ LOG
.warn("JDBC error occurred while trying to detect database type. Will use default JDBC implementation: "
+ e.getMessage());
JDBCPersistenceAdapter.log("Failure Details: ", e);
@@ -417,13 +417,13 @@
this.useDatabaseLock = useDatabaseLock;
}
- static public void log(String msg, SQLException e) {
+ public static void log(String msg, SQLException e) {
String s = msg + e.getMessage();
while (e.getNextException() != null) {
e = e.getNextException();
s += ", due to: " + e.getMessage();
}
- log.debug(s, e);
+ LOG.debug(s, e);
}
public Statements getStatements() {
@@ -454,7 +454,7 @@
}
}
} catch (IOException e) {
- log.error("Failed to get database when trying keepalive: " + e, e);
+ LOG.error("Failed to get database when trying keepalive: " + e, e);
}
if (stop) {
stopBroker();
@@ -463,11 +463,11 @@
protected void stopBroker() {
// we can no longer keep the lock so lets fail
- log.info("No longer able to keep the exclusive lock so giving up being a master");
+ LOG.info("No longer able to keep the exclusive lock so giving up being a master");
try {
brokerService.stop();
} catch (Exception e) {
- log.warn("Failed to stop broker");
+ LOG.warn("Failed to stop broker");
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java Thu Aug 9 09:37:49 2007
@@ -35,7 +35,7 @@
*/
public class TransactionContext {
- private static final Log log = LogFactory.getLog(TransactionContext.class);
+ private static final Log LOG = LogFactory.getLog(TransactionContext.class);
private final DataSource dataSource;
private Connection connection;
@@ -134,7 +134,7 @@
connection.close();
}
} catch (Throwable e) {
- log.warn("Close failed: " + e.getMessage(), e);
+ LOG.warn("Close failed: " + e.getMessage(), e);
} finally {
connection = null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BytesJDBCAdapter.java Thu Aug 9 09:37:49 2007
@@ -21,30 +21,29 @@
import java.sql.SQLException;
/**
- * This JDBCAdapter inserts and extracts BLOB data using the
- * setBytes()/getBytes() operations.
- *
- * The databases/JDBC drivers that use this adapter are:
+ * This JDBCAdapter inserts and extracts BLOB data using the
+ * setBytes()/getBytes() operations. The databases/JDBC drivers that use this
+ * adapter are:
*
* @org.apache.xbean.XBean element="bytesJDBCAdapter"
- *
* @version $Revision: 1.2 $
*/
public class BytesJDBCAdapter extends DefaultJDBCAdapter {
-
/**
- * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet, int)
+ * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#getBinaryData(java.sql.ResultSet,
+ * int)
*/
protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
return rs.getBytes(index);
}
-
+
/**
- * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement, int, byte[])
+ * @see org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter#setBinaryData(java.sql.PreparedStatement,
+ * int, byte[])
*/
protected void setBinaryData(PreparedStatement s, int index, byte[] data) throws SQLException {
s.setBytes(index, data);
}
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Aug 9 09:37:49 2007
@@ -1,17 +1,19 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
@@ -50,7 +52,7 @@
*/
public class DefaultJDBCAdapter implements JDBCAdapter {
- private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
+ private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements;
protected boolean batchStatments = true;
@@ -87,15 +89,15 @@
// This will fail usually since the tables will be
// created already.
try {
- log.debug("Executing SQL: " + createStatments[i]);
+ LOG.debug("Executing SQL: " + createStatments[i]);
boolean rc = s.execute(createStatments[i]);
} catch (SQLException e) {
if (alreadyExists) {
- log.debug("Could not create JDBC tables; The message table already existed."
+ LOG.debug("Could not create JDBC tables; The message table already existed."
+ " Failure was: " + createStatments[i] + " Message: " + e.getMessage()
+ " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode());
} else {
- log.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
+ LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
+ createStatments[i] + " Message: " + e.getMessage() + " SQLState: "
+ e.getSQLState() + " Vendor code: " + e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e);
@@ -122,7 +124,7 @@
try {
boolean rc = s.execute(dropStatments[i]);
} catch (SQLException e) {
- log.warn("Could not drop JDBC tables; they may not exist." + " Failure was: "
+ LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: "
+ dropStatments[i] + " Message: " + e.getMessage() + " SQLState: "
+ e.getSQLState() + " Vendor code: " + e.getErrorCode());
JDBCPersistenceAdapter.log("Failure details: ", e);
@@ -564,11 +566,11 @@
public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
PreparedStatement s = null;
try {
- log.debug("Executing SQL: " + statements.getDeleteOldMessagesStatement());
+ LOG.debug("Executing SQL: " + statements.getDeleteOldMessagesStatement());
s = c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
s.setLong(1, System.currentTimeMillis());
int i = s.executeUpdate();
- log.debug("Deleted " + i + " old message(s).");
+ LOG.debug("Deleted " + i + " old message(s).");
} finally {
close(s);
}
@@ -600,14 +602,14 @@
return result;
}
- static private void close(PreparedStatement s) {
+ private static void close(PreparedStatement s) {
try {
s.close();
} catch (Throwable e) {
}
}
- static private void close(ResultSet rs) {
+ private static void close(ResultSet rs) {
try {
rs.close();
} catch (Throwable e) {
@@ -710,7 +712,7 @@
if (listener.recoverMessageReference(rs.getString(1))) {
count++;
} else {
- log.debug("Stopped recover next messages");
+ LOG.debug("Stopped recover next messages");
}
}
} else {
@@ -718,7 +720,7 @@
if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
count++;
} else {
- log.debug("Stopped recover next messages");
+ LOG.debug("Stopped recover next messages");
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/HsqldbJDBCAdapter.java Thu Aug 9 09:37:49 2007
@@ -19,14 +19,13 @@
import org.apache.activemq.store.jdbc.Statements;
/**
- *
* @version $Revision: 1.2 $
*/
public class HsqldbJDBCAdapter extends BytesJDBCAdapter {
-
+
public void setStatements(Statements statements) {
statements.setBinaryDataType("OTHER");
super.setStatements(statements);
}
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?view=diff&rev=564271&r1=564270&r2=564271
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Thu Aug 9 09:37:49 2007
@@ -66,8 +66,7 @@
private UsageManager usageManager;
- public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore,
- ActiveMQDestination destination) {
+ public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.longTermStore = checkpointStore;
@@ -93,20 +92,23 @@
final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if (!context.isInTransaction()) {
- if (debug)
+ if (debug) {
log.debug("Journalled message add for: " + id + ", at: " + location);
+ }
addMessage(message, location);
} else {
- if (debug)
+ if (debug) {
log.debug("Journalled transacted message add for: " + id + ", at: " + location);
+ }
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
- if (debug)
+ if (debug) {
log.debug("Transacted message add commit for: " + id + ", at: " + location);
+ }
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
addMessage(message, location);
@@ -114,8 +116,9 @@
}
public void afterRollback() throws Exception {
- if (debug)
+ if (debug) {
log.debug("Transacted message add rollback for: " + id + ", at: " + location);
+ }
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
}
@@ -141,8 +144,7 @@
longTermStore.addMessage(context, message);
}
} catch (Throwable e) {
- log.warn("Could not replay add for message '" + message.getMessageId()
- + "'. Message may have already been added. reason: " + e);
+ log.warn("Could not replay add for message '" + message.getMessageId() + "'. Message may have already been added. reason: " + e);
}
}
@@ -156,22 +158,25 @@
final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
if (!context.isInTransaction()) {
- if (debug)
+ if (debug) {
log.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
+ }
removeMessage(ack, location);
} else {
- if (debug)
+ if (debug) {
log.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: "
+ location);
+ }
synchronized (this) {
inFlightTxLocations.add(location);
}
transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
- if (debug)
+ if (debug) {
log.debug("Transacted message remove commit for: " + ack.getLastMessageId()
+ ", at: " + location);
+ }
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
removeMessage(ack, location);
@@ -212,8 +217,7 @@
longTermStore.removeMessage(context, messageAck);
}
} catch (Throwable e) {
- log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
- + "'. Message may have already been acknowledged. reason: " + e);
+ log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
}
@@ -375,8 +379,7 @@
return destination;
}
- public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
- String messageRef) throws IOException {
+ public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
throw new IOException("The journal does not support message references.");
}