You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/01/04 02:48:22 UTC

svn commit: r492380 [2/2] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/impl/async/ main/java/org/apache/activemq/store/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/quick/ t...

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,205 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.quick;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A MessageStore that uses a Journal to store it's messages.
+ * 
+ * @version $Revision: 1.13 $
+ */
+public class QuickTopicMessageStore extends QuickMessageStore implements TopicMessageStore {
+    
+    private static final Log log = LogFactory.getLog(QuickTopicMessageStore.class);
+
+    private TopicReferenceStore topicReferenceStore;
+	private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
+    
+    public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore checkpointStore, ActiveMQTopic destinationName) {
+        super(adapter, checkpointStore, destinationName);
+        this.topicReferenceStore = checkpointStore;
+    }
+    
+    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+        this.peristenceAdapter.checkpoint(true);
+        topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
+    }
+    
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned, final MessageRecoveryListener listener) throws Exception{
+        this.peristenceAdapter.checkpoint(true);
+        topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, new RecoveryListenerAdapter(this, listener));        
+    }
+
+    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+        return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
+    }
+
+    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+        this.peristenceAdapter.checkpoint(true);
+        topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
+    }
+
+    public void addMessage(ConnectionContext context, Message message) throws IOException {
+        super.addMessage(context, message);
+    }
+    
+    /**
+     */
+    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+        final boolean debug = log.isDebugEnabled();
+        
+        JournalTopicAck ack = new JournalTopicAck();
+        ack.setDestination(destination);
+        ack.setMessageId(messageId);
+        ack.setMessageSequenceId(messageId.getBrokerSequenceId());
+        ack.setSubscritionName(subscriptionName);
+        ack.setClientId(clientId);
+        ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
+        final Location location = peristenceAdapter.writeCommand(ack, false);
+        
+        final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);        
+        if( !context.isInTransaction() ) {
+            if( debug )
+                log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
+            acknowledge(messageId, location, key);
+        } else {
+            if( debug )
+                log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
+            synchronized (this) {
+                inFlightTxLocations.add(location);
+            }
+            transactionStore.acknowledge(this, ack, location);
+            context.getTransaction().addSynchronization(new Synchronization(){
+                public void afterCommit() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
+                    synchronized (QuickTopicMessageStore.this) {
+                        inFlightTxLocations.remove(location);
+                        acknowledge(messageId, location, key);
+                    }
+                }
+                public void afterRollback() throws Exception {                    
+                    if( debug )
+                        log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
+                    synchronized (QuickTopicMessageStore.this) {
+                        inFlightTxLocations.remove(location);
+                    }
+                }
+            });
+        }
+        
+    }
+    
+    public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+        try {
+            SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
+            if( sub != null ) {
+                topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
+            }
+        }
+        catch (Throwable e) {
+            log.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
+        }
+    }
+        
+
+    /**
+     * @param messageId
+     * @param location
+     * @param key
+     */
+    private void acknowledge(MessageId messageId, Location location, SubscriptionKey key) {
+        synchronized(this) {
+		    lastLocation = location;
+		    ackedLastAckLocations.put(key, messageId);
+		}
+    }
+    
+    public Location checkpoint() throws IOException {
+        
+		final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
+
+        // swap out the hash maps..
+        synchronized (this) {
+            cpAckedLastAckLocations = this.ackedLastAckLocations;
+            this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
+        }
+
+        return super.checkpoint( new Callback() {
+            public void execute() throws Exception {
+
+                // Checkpoint the acknowledged messages.
+                Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
+                while (iterator.hasNext()) {
+                    SubscriptionKey subscriptionKey = iterator.next();
+                    MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
+                    topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
+                }
+
+            }
+        });
+
+    }
+
+    /**
+	 * @return Returns the longTermStore.
+	 */
+	public TopicReferenceStore getTopicReferenceStore() {
+		return topicReferenceStore;
+	}
+
+    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+        topicReferenceStore.deleteSubscription(clientId, subscriptionName);
+    }
+    
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return topicReferenceStore.getAllSubscriptions();
+    }
+
+    
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        this.peristenceAdapter.checkpoint(true);
+        return topicReferenceStore.getMessageCount(clientId,subscriberName);
+    }
+    
+    public void resetBatching(String clientId,String subscriptionName) {
+        topicReferenceStore.resetBatching(clientId,subscriptionName);
+    }
+
+    
+
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,338 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.quick;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.JournalTransaction;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+
+
+/**
+ */
+public class QuickTransactionStore implements TransactionStore {
+
+    private final QuickPersistenceAdapter peristenceAdapter;
+    Map<TransactionId, Tx> inflightTransactions = new LinkedHashMap<TransactionId, Tx>();
+    Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
+    private boolean doingRecover;
+
+    
+    public static class TxOperation {
+        
+        static final byte ADD_OPERATION_TYPE       = 0;
+        static final byte REMOVE_OPERATION_TYPE    = 1;
+        static final byte ACK_OPERATION_TYPE       = 3;
+        
+        public byte operationType;
+        public QuickMessageStore store;
+        public Object data;
+        
+        public TxOperation(byte operationType, QuickMessageStore store, Object data) {
+            this.operationType=operationType;
+            this.store=store;
+            this.data=data;
+        }
+        
+    }
+    /**
+     * Operations
+     * @version $Revision: 1.6 $
+     */
+    public static class Tx {
+
+        private final Location location;
+        private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
+
+        public Tx(Location location) {
+            this.location=location;
+        }
+
+        public void add(QuickMessageStore store, Message msg) {
+            operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
+        }
+
+        public void add(QuickMessageStore store, MessageAck ack) {
+            operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
+        }
+
+        public void add(QuickTopicMessageStore store, JournalTopicAck ack) {
+            operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
+        }
+        
+        public Message[] getMessages() {
+            ArrayList<Object> list = new ArrayList<Object>();
+            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = iter.next();
+                if( op.operationType==TxOperation.ADD_OPERATION_TYPE ) {
+                    list.add(op.data);
+                }
+            }
+            Message rc[] = new Message[list.size()];
+            list.toArray(rc);
+            return rc;
+        }
+
+        public MessageAck[] getAcks() {
+            ArrayList<Object> list = new ArrayList<Object>();
+            for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
+                TxOperation op = iter.next();
+                if( op.operationType==TxOperation.REMOVE_OPERATION_TYPE ) {
+                    list.add(op.data);
+                }
+            }
+            MessageAck rc[] = new MessageAck[list.size()];
+            list.toArray(rc);
+            return rc;
+        }
+
+        public ArrayList<TxOperation> getOperations() {
+            return operations;
+        }
+
+    }
+
+    public QuickTransactionStore(QuickPersistenceAdapter adapter) {
+        this.peristenceAdapter = adapter;
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void prepare(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.remove(txid);
+        }
+        if(tx==null)
+            return;
+        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
+        synchronized(preparedTransactions){
+            preparedTransactions.put(txid,tx);
+        }
+    }
+    
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void replayPrepare(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.remove(txid);
+        }
+        if(tx==null)
+            return;
+        synchronized(preparedTransactions){
+            preparedTransactions.put(txid,tx);
+        }
+    }
+
+    public Tx getTx(TransactionId txid,Location location){
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.get(txid);
+        }
+        if(tx==null){
+            tx=new Tx(location);
+            inflightTransactions.put(txid,tx);
+        }
+        return tx;
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
+        Tx tx;
+        if(wasPrepared){
+            synchronized(preparedTransactions){
+                tx=preparedTransactions.remove(txid);
+            }
+        }else{
+            synchronized(inflightTransactions){
+                tx=inflightTransactions.remove(txid);
+            }
+        }
+        if(tx==null)
+            return;
+        if(txid.isXATransaction()){
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
+        }else{
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
+                    true);
+        }
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public Tx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
+        if(wasPrepared){
+            synchronized(preparedTransactions){
+                return preparedTransactions.remove(txid);
+            }
+        }else{
+            synchronized(inflightTransactions){
+                return inflightTransactions.remove(txid);
+            }
+        }
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void rollback(TransactionId txid) throws IOException{
+        Tx tx=null;
+        synchronized(inflightTransactions){
+            tx=inflightTransactions.remove(txid);
+        }
+        if(tx!=null)
+            synchronized(preparedTransactions){
+                tx=preparedTransactions.remove(txid);
+            }
+        if(tx!=null){
+            if(txid.isXATransaction()){
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
+            }else{
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
+                        true);
+            }
+        }
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void replayRollback(TransactionId txid) throws IOException{
+        boolean inflight=false;
+        synchronized(inflightTransactions){
+            inflight=inflightTransactions.remove(txid)!=null;
+        }
+        if(inflight){
+            synchronized(preparedTransactions){
+                preparedTransactions.remove(txid);
+            }
+        }
+    }
+    
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+    
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
+        // All the in-flight transactions get rolled back..
+        synchronized(inflightTransactions){
+            inflightTransactions.clear();
+        }
+        this.doingRecover=true;
+        try{
+            Map<TransactionId, Tx> txs=null;
+            synchronized(preparedTransactions){
+                txs=new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
+            }
+            for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
+                Object txid=iter.next();
+                Tx tx=txs.get(txid);
+                listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
+            }
+        }finally{
+            this.doingRecover=false;
+        }
+    }
+
+    /**
+     * @param message
+     * @throws IOException
+     */
+    void addMessage(QuickMessageStore store, Message message, Location location) throws IOException {
+        Tx tx = getTx(message.getTransactionId(), location);
+        tx.add(store, message);
+    }
+
+    /**
+     * @param ack
+     * @throws IOException
+     */
+    public void removeMessage(QuickMessageStore store, MessageAck ack, Location location) throws IOException {
+        Tx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack);
+    }
+    
+    
+    public void acknowledge(QuickTopicMessageStore store, JournalTopicAck ack, Location location) {
+        Tx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack);
+    }
+
+
+    public Location checkpoint() throws IOException{
+        // Nothing really to checkpoint.. since, we don't
+        // checkpoint tx operations in to long term store until they are committed.
+        // But we keep track of the first location of an operation
+        // that was associated with an active tx. The journal can not
+        // roll over active tx records.
+        Location rc=null;
+        synchronized(inflightTransactions){
+            for(Iterator<Tx> iter=inflightTransactions.values().iterator();iter.hasNext();){
+                Tx tx=iter.next();
+                Location location=tx.location;
+                if(rc==null||rc.compareTo(location)<0){
+                    rc=location;
+                }
+            }
+        }
+        synchronized(preparedTransactions){
+            for(Iterator<Tx> iter=preparedTransactions.values().iterator();iter.hasNext();){
+                Tx tx=iter.next();
+                Location location=tx.location;
+                if(rc==null||rc.compareTo(location)<0){
+                    rc=location;
+                }
+            }
+            return rc;
+        }
+    }
+
+    public boolean isDoingRecover() {
+        return doingRecover;
+    }
+
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java Wed Jan  3 17:48:20 2007
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.quick;
+
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+
+final class RecoveryListenerAdapter implements MessageRecoveryListener {
+
+	private final MessageStore store;
+	private final MessageRecoveryListener listener;
+
+	RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) {
+		this.store = store;
+		this.listener = listener;
+	}
+
+	public void finished() {
+		listener.finished();
+	}
+
+	public boolean hasSpace() {
+		return listener.hasSpace();
+	}
+
+	public void recoverMessage(Message message) throws Exception {
+		listener.recoverMessage(message);
+	}
+
+	public void recoverMessageReference(MessageId ref) throws Exception {
+		listener.recoverMessage( this.store.getMessage(ref) );
+	}
+}
\ No newline at end of file

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html?view=auto&rev=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/package.html Wed Jan  3 17:48:20 2007
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	
+</p>
+
+</body>
+</html>

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java Wed Jan  3 17:48:20 2007
@@ -54,14 +54,14 @@
     }
 
     
-    @Override
-    public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception {
-    	// TODO: this test is currently failing in base class.. overriden to avoid failure
-    }
-    
-    @Override
-    public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
-    	// TODO: this test is currently failing in base class.. overriden to avoid failure
-    }
+//    @Override
+//    public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception {
+//    	// TODO: this test is currently failing in base class.. overriden to avoid failure
+//    }
+//    
+//    @Override
+//    public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
+//    	// TODO: this test is currently failing in base class.. overriden to avoid failure
+//    }
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties?view=diff&rev=492380&r1=492379&r2=492380
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties Wed Jan  3 17:48:20 2007
@@ -18,7 +18,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
 
 log4j.logger.org.apache.activemq.spring=WARN