You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/17 11:34:00 UTC

svn commit: r476101 [2/2] - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/cursors/ store/ store/jdbc/ store/jdbc/adapter/ store/journal/ store/kahadaptor/ store/memory/ store/rapid/

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.memory;
 
 import java.io.IOException;
@@ -24,7 +21,6 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -37,72 +33,76 @@
 /**
  * @version $Revision: 1.5 $
  */
-public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
+public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
 
     private Map ackDatabase;
     private Map subscriberDatabase;
     MessageId lastMessageId;
-    
-    public MemoryTopicMessageStore(ActiveMQDestination destination) {
-        this(destination, new LinkedHashMap(), makeMap(), makeMap());
+
+    public MemoryTopicMessageStore(ActiveMQDestination destination){
+        this(destination,new LinkedHashMap(),makeMap(),makeMap());
     }
-    protected static Map makeMap() {
+
+    protected static Map makeMap(){
         return Collections.synchronizedMap(new HashMap());
     }
-    
-    public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase, Map ackDatabase) {
-        super(destination, messageTable);
-        this.subscriberDatabase = subscriberDatabase;
-        this.ackDatabase = ackDatabase;
+
+    public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase,
+            Map ackDatabase){
+        super(destination,messageTable);
+        this.subscriberDatabase=subscriberDatabase;
+        this.ackDatabase=ackDatabase;
     }
 
-    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
-        super.addMessage(context, message);
-        lastMessageId = message.getMessageId();
+    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+        super.addMessage(context,message);
+        lastMessageId=message.getMessageId();
     }
 
-    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
-        ackDatabase.put(new SubscriptionKey(clientId, subscriptionName), messageId);
+    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
+            throws IOException{
+        ackDatabase.put(new SubscriptionKey(clientId,subscriptionName),messageId);
     }
 
-    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
-        return (SubscriptionInfo) subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
+    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
+        return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
     }
 
-    public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
-        SubscriptionInfo info = new SubscriptionInfo();
+    public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+            throws IOException{
+        SubscriptionInfo info=new SubscriptionInfo();
         info.setDestination(destination);
         info.setClientId(clientId);
         info.setSelector(selector);
         info.setSubcriptionName(subscriptionName);
-        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
-        subscriberDatabase.put(key, info);
-        MessageId l=retroactive ? null : lastMessageId;
-        if( l!=null ) {
-            ackDatabase.put(key, l);
+        SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+        subscriberDatabase.put(key,info);
+        MessageId l=retroactive?null:lastMessageId;
+        if(l!=null){
+            ackDatabase.put(key,l);
         }
     }
-    
-    public void deleteSubscription(String clientId, String subscriptionName) {
-        org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+
+    public void deleteSubscription(String clientId,String subscriptionName){
+        org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
         ackDatabase.remove(key);
         subscriberDatabase.remove(key);
     }
-    
+
     public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-                    throws Exception{
-        MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+            throws Exception{
+        MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
         boolean pastLastAck=lastAck==null;
         // the message table is a synchronizedMap - so just have to synchronize here
         synchronized(messageTable){
             for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                Map.Entry entry=(Entry) iter.next();
+                Map.Entry entry=(Entry)iter.next();
                 if(pastLastAck){
                     Object msg=entry.getValue();
                     if(msg.getClass()==String.class){
-                        listener.recoverMessageReference((String) msg);
+                        listener.recoverMessageReference((String)msg);
                     }else{
-                        listener.recoverMessage((Message) msg);
+                        listener.recoverMessage((Message)msg);
                     }
                 }else{
                     pastLastAck=entry.getKey().equals(lastAck);
@@ -111,92 +111,40 @@
             listener.finished();
         }
     }
-    
-    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
-        MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
-        boolean startFound=false;
-        // the message table is a synchronizedMap - so just have to synchronize here
-        synchronized(messageTable){
-            int count = 0;
-            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() && count < maxReturned;){
-                Map.Entry entry=(Entry) iter.next();
-               
-                    Object msg=entry.getValue();
-                    if(msg.getClass()==String.class){
-                        String ref=msg.toString();
-                        if(startFound||ref.equals(lastMessageId.toString())){
-                            startFound=true;
-                        }else if (startFound){
-                            listener.recoverMessageReference(ref);
-                            count++;
-                        }
-                    }else{
-                        Message message=(Message) msg;
-                        if(startFound||message.getMessageId().equals(lastMessageId)){
-                            startFound=true;
-                        }else if (startFound){
-                            listener.recoverMessage(message);
-                            count++;
-                        }
-                    }
-                
-            }
-            listener.finished();
-        }
-        
+
+    public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+            MessageRecoveryListener listener) throws Exception{
+        listener.finished();
     }
 
-    public void delete() {
+    public void delete(){
         super.delete();
         ackDatabase.clear();
         subscriberDatabase.clear();
         lastMessageId=null;
     }
-    
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-        return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
+
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+        return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
     }
+
     public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
-        // the message table is a synchronizedMap - so just have to synchronize here
-        boolean matchFound = false;
-        synchronized(messageTable){
-            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                Map.Entry entry=(Entry) iter.next();
-                if(!matchFound && entry.getKey().equals(id)){
-                    matchFound = true;
-                }else if (matchFound) {
-                    Message msg =  (Message) entry.getValue();
-                    return msg.getMessageId();
-                }
-            }
-        }
         return null;
     }
-    
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
-        // the message table is a synchronizedMap - so just have to synchronize here
-        Message last= null;
-        synchronized(messageTable){
-            for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                Map.Entry entry=(Entry) iter.next();
-                
-                if(entry.getKey().equals(id)){
-                    return last != null ? last.getMessageId() : null;
-                }else {
-                    last = (Message)entry.getValue();
-                }
-            }
-        }
+
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
+            throws IOException{
         return null;
     }
+
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
-        int result = 0;
-        MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
+        int result=0;
+        MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
         // the message table is a synchronizedMap - so just have to synchronize here
         synchronized(messageTable){
-            result = messageTable.size();
+            result=messageTable.size();
             for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
-                Map.Entry entry=(Entry) iter.next();
+                Map.Entry entry=(Entry)iter.next();
                 if(entry.getKey().equals(lastAck)){
                     break;
                 }
@@ -205,8 +153,14 @@
         }
         return result;
     }
-    
-    public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+
+    public void resetBatching(String clientId,String subscriptionName,MessageId id){
+    }
+
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+            MessageRecoveryListener listener) throws Exception{
+    }
+
+    public void resetBatching(String clientId,String subscriptionName){
     }
-    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri Nov 17 02:33:57 2006
@@ -287,4 +287,16 @@
         }
     }
 
-}
+
+   
+    public int getMessageCount(){
+        return 0;
+    }
+
+    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+    }
+
+    public void resetBatching(){
+    }
+
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -20,6 +20,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activeio.journal.RecordLocation;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -38,8 +40,7 @@
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+
 
 /**
  * A MessageStore that uses a Journal to store it's messages.
@@ -312,44 +313,6 @@
         subscriberAcks.put(key,container);
     }
 
-    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
-            throws IOException{
-        MessageId result=null;
-        boolean getNext=false;
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer)subscriberAcks.get(key);
-        Iterator iter=list.iterator();
-        for(Iterator i=list.iterator();i.hasNext();){
-            String id=i.next().toString();
-            if(id.equals(messageId.toString())){
-                getNext=true;
-            }else if(getNext){
-                result=new MessageId(id);
-                break;
-            }
-        }
-        return result;
-    }
-
-    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
-            throws IOException{
-        MessageId result=null;
-        String previousId=null;
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer)subscriberAcks.get(key);
-        Iterator iter=list.iterator();
-        for(Iterator i=list.iterator();i.hasNext();){
-            String id=i.next().toString();
-            if(id.equals(messageId.toString())){
-                if(previousId!=null){
-                    result=new MessageId(previousId);
-                }
-                break;
-            }
-            previousId=id;
-        }
-        return result;
-    }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriberName);
@@ -359,4 +322,16 @@
 
     public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
     }
-}
+
+    
+    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
+        
+        
+    }
+
+    
+    public void resetBatching(String clientId,String subscriptionName){
+      
+        
+    }
+}
\ No newline at end of file