You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/21 09:11:04 UTC

svn commit: r477567 [2/2] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/rapid/ test/java/org/apache/activemq/perf/

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=477567&r1=477566&r2=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Tue Nov 21 00:11:03 2006
@@ -15,32 +15,27 @@
 package org.apache.activemq.store.rapid;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.activeio.journal.active.Location;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
-import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.util.SubscriptionKey;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.activemq.store.kahadaptor.ConsumerMessageRef;
+import org.apache.activemq.store.kahadaptor.ConsumerMessageRefMarshaller;
+import org.apache.activemq.store.kahadaptor.TopicSubAck;
+import org.apache.activemq.store.kahadaptor.TopicSubContainer;
 
 
 /**
@@ -50,78 +45,61 @@
  */
 public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{
 
-    private static final Log log=LogFactory.getLog(RapidTopicMessageStore.class);
-    private HashMap ackedLastAckLocations=new HashMap();
-    private final MapContainer subscriberContainer;
-    private final MapContainer ackContainer;
-    private final Store store;
-    private Map subscriberAcks=new ConcurrentHashMap();
-
-    public RapidTopicMessageStore(RapidPersistenceAdapter adapter,ActiveMQTopic destination,
-            MapContainer messageContainer,MapContainer subsContainer,MapContainer ackContainer) throws IOException{
-        super(adapter,destination,messageContainer);
-        this.subscriberContainer=subsContainer;
+    private ListContainer ackContainer;
+    private Map subscriberContainer;
+    private Store store;
+    private Map subscriberMessages=new ConcurrentHashMap();
+
+    public RapidTopicMessageStore(RapidPersistenceAdapter adapter, Store store,ListContainer messageContainer,ListContainer ackContainer,
+            MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
+        super(adapter,destination,messageContainer,maximumCacheSize);
+        this.store=store;
         this.ackContainer=ackContainer;
-        this.store=adapter.getStore();
+        subscriberContainer=subsContainer;
+        // load all the Ack containers
         for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
             Object key=i.next();
-            addSubscriberAckContainer(key);
+            addSubscriberMessageContainer(key);
         }
     }
 
-    public void recoverSubscription(String clientId,String subscriptionName,final MessageRecoveryListener listener)
-            throws Exception{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer)subscriberAcks.get(key);
-        if(list!=null){
-            for(Iterator i=list.iterator();i.hasNext();){
-                Object msg=messageContainer.get(i.next());
-                if(msg!=null){
-                    if(msg.getClass()==String.class){
-                        listener.recoverMessageReference((String)msg);
-                    }else{
-                        listener.recoverMessage((Message)msg);
-                    }
-                }
-                listener.finished();
+    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+        int subscriberCount=subscriberMessages.size();
+        if(subscriberCount>0){
+            final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
+            final RapidMessageReference md = new RapidMessageReference(message, location);
+            StoreEntry messageEntry=messageContainer.placeLast(md);
+            TopicSubAck tsa=new TopicSubAck();
+            tsa.setCount(subscriberCount);
+            tsa.setMessageEntry(messageEntry);
+            StoreEntry ackEntry=ackContainer.placeLast(tsa);
+            for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+                TopicSubContainer container=(TopicSubContainer)i.next();
+                ConsumerMessageRef ref=new ConsumerMessageRef();
+                ref.setAckEntry(ackEntry);
+                ref.setMessageEntry(messageEntry);
+                container.getListContainer().add(ref);
             }
-        }else{
-            listener.finished();
         }
     }
 
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
-            MessageRecoveryListener listener) throws Exception{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer)subscriberAcks.get(key);
-        if(list!=null){
-            boolean startFound=false;
-            int count=0;
-            for(Iterator i=list.iterator();i.hasNext()&&count<maxReturned;){
-                Object msg=messageContainer.get(i.next());
-                if(msg!=null){
-                    if(msg.getClass()==String.class){
-                        String ref=msg.toString();
-                        if(startFound||lastMessageId==null){
-                            listener.recoverMessageReference(ref);
-                            count++;
-                        }else if(!startFound||ref.equals(lastMessageId.toString())){
-                            startFound=true;
-                        }
+    public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
+            MessageId messageId) throws IOException{
+        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
+        if(container!=null){
+            ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
                     }else{
-                        Message message=(Message)msg;
-                        if(startFound||lastMessageId==null){
-                            listener.recoverMessage(message);
-                            count++;
-                        }else if(!startFound&&message.getMessageId().equals(lastMessageId)){
-                            startFound=true;
-                        }
+                        ackContainer.update(ref.getAckEntry(),tsa);
                     }
                 }
-                listener.finished();
             }
-        }else{
-            listener.finished();
         }
     }
 
@@ -129,7 +107,7 @@
         return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
     }
 
-    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+    public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
             throws IOException{
         SubscriptionInfo info=new SubscriptionInfo();
         info.setDestination(destination);
@@ -142,122 +120,82 @@
         if(!subscriberContainer.containsKey(key)){
             subscriberContainer.put(key,info);
         }
-        addSubscriberAckContainer(key);
-    }
-
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        int subscriberCount=subscriberAcks.size();
-        if(subscriberCount>0){
-            String id=message.getMessageId().toString();
-            ackContainer.put(id,new AtomicInteger(subscriberCount));
-            for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
-                Object key=i.next();
-                ListContainer container=store.getListContainer(key,"durable-subs");
-                container.add(id);
-            }
-            super.addMessage(context,message);
-        }
+        addSubscriberMessageContainer(key);
     }
 
-    /**
-     */
-    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
-            throws IOException{
-        final boolean debug=log.isDebugEnabled();
-        JournalTopicAck ack=new JournalTopicAck();
-        ack.setDestination(destination);
-        ack.setMessageId(messageId);
-        ack.setMessageSequenceId(messageId.getBrokerSequenceId());
-        ack.setSubscritionName(subscriptionName);
-        ack.setClientId(clientId);
-        ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
-        final Location location=peristenceAdapter.writeCommand(ack,false);
-        final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
-        if(!context.isInTransaction()){
-            if(debug)
-                log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
-            acknowledge(messageId,location,key);
-        }else{
-            if(debug)
-                log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
-            synchronized(this){
-                inFlightTxLocations.add(location);
-            }
-            transactionStore.acknowledge(this,ack,location);
-            context.getTransaction().addSynchronization(new Synchronization(){
-
-                public void afterCommit() throws Exception{
-                    if(debug)
-                        log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
-                    synchronized(RapidTopicMessageStore.this){
-                        inFlightTxLocations.remove(location);
-                        acknowledge(messageId,location,key);
-                    }
-                }
-
-                public void afterRollback() throws Exception{
-                    if(debug)
-                        log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
-                    synchronized(RapidTopicMessageStore.this){
-                        inFlightTxLocations.remove(location);
+    public synchronized void deleteSubscription(String clientId,String subscriptionName){
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        subscriberContainer.remove(key);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
+                    }else{
+                        ackContainer.update(ref.getAckEntry(),tsa);
                     }
                 }
-            });
+            }
         }
     }
 
-    public void replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,MessageId messageId){
-        try{
-            synchronized(this){
-                String subcriberId=getSubscriptionKey(clientId,subscritionName);
-                String id=messageId.toString();
-                ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
-                if(container!=null){
-                    // container.remove(id);
-                    container.removeFirst();
-                    AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
-                    if(count!=null){
-                        if(count.decrementAndGet()>0){
-                            ackContainer.put(id,count);
-                        }else{
-                            // no more references to message messageContainer so remove it
-                            messageContainer.remove(messageId.toString());
-                        }
-                    }
+    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
+            throws Exception{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        if(container!=null){
+            for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+                ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+                RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(ref
+                        .getMessageEntry());
+                if(messageReference!=null){
+                    Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
+                    listener.recoverMessage(m);
                 }
             }
-        }catch(Throwable e){
-            log.debug("Could not replay acknowledge for message '"+messageId
-                    +"'.  Message may have already been acknowledged. reason: "+e);
         }
+        listener.finished();
     }
 
-    /**
-     * @param messageId
-     * @param location
-     * @param key
-     */
-    private void acknowledge(MessageId messageId,Location location,SubscriptionKey key){
-        synchronized(this){
-            lastLocation=location;
-            ackedLastAckLocations.put(key,messageId);
-            String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
-            String id=messageId.toString();
-            ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
-            if(container!=null){
-                // container.remove(id);
-                container.removeFirst();
-                AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
-                if(count!=null){
-                    if(count.decrementAndGet()>0){
-                        ackContainer.put(id,count);
-                    }else{
-                        // no more references to message messageContainer so remove it
-                        messageContainer.remove(messageId.toString());
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+            MessageRecoveryListener listener) throws Exception{
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        if(container!=null){
+            int count=0;
+            StoreEntry entry=container.getBatchEntry();
+            if(entry==null){
+                entry=container.getListContainer().getFirst();
+            }else{
+                entry=container.getListContainer().refresh(entry);
+                entry=container.getListContainer().getNext(entry);
+            }
+            if(entry!=null){
+                do{
+                    ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
+                    RapidMessageReference messageReference=(RapidMessageReference)messageContainer.get(consumerRef
+                            .getMessageEntry());
+                    if(messageReference!=null){
+                        Message m=(Message)peristenceAdapter.readCommand(messageReference.getLocation());
+                        listener.recoverMessage(m);
+                        count++;
                     }
-                }
+                    container.setBatchEntry(entry);
+                    entry=container.getListContainer().getNext(entry);
+                }while(entry!=null&&count<maxReturned);
             }
         }
+        listener.finished();
+    }
+
+    
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+        return (SubscriptionInfo[])subscriberContainer.values().toArray(
+                new SubscriptionInfo[subscriberContainer.size()]);
     }
 
     protected String getSubscriptionKey(String clientId,String subscriberName){
@@ -266,73 +204,102 @@
         return result;
     }
 
-    public Location checkpoint() throws IOException{
-        ArrayList cpAckedLastAckLocations;
-        // swap out the hash maps..
-        synchronized(this){
-            cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values());
-            this.ackedLastAckLocations=new HashMap();
-        }
-        Location rc=super.checkpoint();
-        if(!cpAckedLastAckLocations.isEmpty()){
-            Collections.sort(cpAckedLastAckLocations);
-            Location t=(Location)cpAckedLastAckLocations.get(0);
-            if(rc==null||t.compareTo(rc)<0){
-                rc=t;
-            }
-        }
-        return rc;
+    protected void addSubscriberMessageContainer(Object key) throws IOException{
+        ListContainer container=store.getListContainer(key,"topic-subs");
+        Marshaller marshaller=new ConsumerMessageRefMarshaller();
+        container.setMarshaller(marshaller);
+        TopicSubContainer tsc=new TopicSubContainer(container);
+        subscriberMessages.put(key,tsc);
     }
 
-    public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        subscriberContainer.remove(key);
-        ListContainer list=(ListContainer)subscriberAcks.get(key);
-        for(Iterator i=list.iterator();i.hasNext();){
-            String id=i.next().toString();
-            AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
-            if(count!=null){
-                if(count.decrementAndGet()>0){
-                    ackContainer.put(id,count);
-                }else{
-                    // no more references to message messageContainer so remove it
-                    messageContainer.remove(id);
-                }
-            }
-        }
+    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+        String key=getSubscriptionKey(clientId,subscriberName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        return container.getListContainer().size();
     }
 
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
-        return (SubscriptionInfo[])subscriberContainer.values().toArray(
-                new SubscriptionInfo[subscriberContainer.size()]);
+    /**
+     * @param context
+     * @param messageId
+     * @param expirationTime
+     * @param messageRef
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext,
+     *      org.apache.activemq.command.MessageId, long, java.lang.String)
+     */
+    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+            throws IOException{
+       throw new IOException("Not supported");
     }
 
-    protected void addSubscriberAckContainer(Object key) throws IOException{
-        ListContainer container=store.getListContainer(key,"durable-subs");
-        Marshaller marshaller=new StringMarshaller();
-        container.setMarshaller(marshaller);
-        subscriberAcks.put(key,container);
+       
+
+    /**
+     * @param identity
+     * @return String
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#getMessageReference(org.apache.activemq.command.MessageId)
+     */
+    public String getMessageReference(MessageId identity) throws IOException{
+        return null;
     }
 
+   
 
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriberName);
-        ListContainer list=(ListContainer)subscriberAcks.get(key);
-        return list.size();
+    /**
+     * @param context
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
+     */
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
+        messageContainer.clear();
+        ackContainer.clear();
+        for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+            TopicSubContainer container=(TopicSubContainer)i.next();
+            container.getListContainer().clear();
+        }
     }
 
-    public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
+    
+    public synchronized void resetBatching(String clientId,String subscriptionName){
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
+        if(topicSubContainer!=null){
+            topicSubContainer.reset();
+        }
     }
 
-    
-    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
-        
-        
+   
+    public Location checkpoint() throws IOException{
+       return null;
     }
 
-    
-    public void resetBatching(String clientId,String subscriptionName){
-      
-        
+
+    public synchronized void replayAcknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId){
+        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
+        if(container!=null){
+            ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
+                    }else{
+                        ackContainer.update(ref.getAckEntry(),tsa);
+                    }
+                }
+            }
+        }
     }
-}
\ No newline at end of file
+}
+
+
+
+
+
+
+
+
+

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java?view=auto&rev=477567
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java Tue Nov 21 00:11:03 2006
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import java.io.File;
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.rapid.RapidPersistenceAdapter;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class RapidStoreQueueTest extends SimpleQueueTest{
+    
+        
+    protected void configureBroker(BrokerService answer) throws Exception{
+        
+    	File dataFileDir = new File("activemq-data/perfTest");
+        File journalDir = new File(dataFileDir, "journal").getCanonicalFile();
+        JournalImpl journal = new JournalImpl(journalDir, 3, 1024*1024*20);
+    	
+        RapidPersistenceAdapter adaptor = new RapidPersistenceAdapter(journal,answer.getTaskRunnerFactory());
+        
+        
+        answer.setPersistenceAdapter(adaptor);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+        
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RapidStoreQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native