You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/17 11:34:00 UTC
svn commit: r476101 [2/2] - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/region/cursors/ store/ store/jdbc/ store/jdbc/adapter/
store/journal/ store/kahadaptor/ store/memory/ store/rapid/
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -1,20 +1,17 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.store.memory;
import java.io.IOException;
@@ -24,7 +21,6 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -37,72 +33,76 @@
/**
* @version $Revision: 1.5 $
*/
-public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
+public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore{
private Map ackDatabase;
private Map subscriberDatabase;
MessageId lastMessageId;
-
- public MemoryTopicMessageStore(ActiveMQDestination destination) {
- this(destination, new LinkedHashMap(), makeMap(), makeMap());
+
+ public MemoryTopicMessageStore(ActiveMQDestination destination){
+ this(destination,new LinkedHashMap(),makeMap(),makeMap());
}
- protected static Map makeMap() {
+
+ protected static Map makeMap(){
return Collections.synchronizedMap(new HashMap());
}
-
- public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase, Map ackDatabase) {
- super(destination, messageTable);
- this.subscriberDatabase = subscriberDatabase;
- this.ackDatabase = ackDatabase;
+
+ public MemoryTopicMessageStore(ActiveMQDestination destination,Map messageTable,Map subscriberDatabase,
+ Map ackDatabase){
+ super(destination,messageTable);
+ this.subscriberDatabase=subscriberDatabase;
+ this.ackDatabase=ackDatabase;
}
- public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
- super.addMessage(context, message);
- lastMessageId = message.getMessageId();
+ public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+ super.addMessage(context,message);
+ lastMessageId=message.getMessageId();
}
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
- ackDatabase.put(new SubscriptionKey(clientId, subscriptionName), messageId);
+ public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,MessageId messageId)
+ throws IOException{
+ ackDatabase.put(new SubscriptionKey(clientId,subscriptionName),messageId);
}
- public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
- return (SubscriptionInfo) subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
+ public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
+ return (SubscriptionInfo)subscriberDatabase.get(new SubscriptionKey(clientId,subscriptionName));
}
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
- SubscriptionInfo info = new SubscriptionInfo();
+ public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ throws IOException{
+ SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
- SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
- subscriberDatabase.put(key, info);
- MessageId l=retroactive ? null : lastMessageId;
- if( l!=null ) {
- ackDatabase.put(key, l);
+ SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+ subscriberDatabase.put(key,info);
+ MessageId l=retroactive?null:lastMessageId;
+ if(l!=null){
+ ackDatabase.put(key,l);
}
}
-
- public void deleteSubscription(String clientId, String subscriptionName) {
- org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+
+ public void deleteSubscription(String clientId,String subscriptionName){
+ org.apache.activemq.util.SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
ackDatabase.remove(key);
subscriberDatabase.remove(key);
}
-
+
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
- throws Exception{
- MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+ throws Exception{
+ MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
boolean pastLastAck=lastAck==null;
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry) iter.next();
+ Map.Entry entry=(Entry)iter.next();
if(pastLastAck){
Object msg=entry.getValue();
if(msg.getClass()==String.class){
- listener.recoverMessageReference((String) msg);
+ listener.recoverMessageReference((String)msg);
}else{
- listener.recoverMessage((Message) msg);
+ listener.recoverMessage((Message)msg);
}
}else{
pastLastAck=entry.getKey().equals(lastAck);
@@ -111,92 +111,40 @@
listener.finished();
}
}
-
- public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,MessageRecoveryListener listener) throws Exception{
- MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
- boolean startFound=false;
- // the message table is a synchronizedMap - so just have to synchronize here
- synchronized(messageTable){
- int count = 0;
- for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext() && count < maxReturned;){
- Map.Entry entry=(Entry) iter.next();
-
- Object msg=entry.getValue();
- if(msg.getClass()==String.class){
- String ref=msg.toString();
- if(startFound||ref.equals(lastMessageId.toString())){
- startFound=true;
- }else if (startFound){
- listener.recoverMessageReference(ref);
- count++;
- }
- }else{
- Message message=(Message) msg;
- if(startFound||message.getMessageId().equals(lastMessageId)){
- startFound=true;
- }else if (startFound){
- listener.recoverMessage(message);
- count++;
- }
- }
-
- }
- listener.finished();
- }
-
+
+ public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
+ MessageRecoveryListener listener) throws Exception{
+ listener.finished();
}
- public void delete() {
+ public void delete(){
super.delete();
ackDatabase.clear();
subscriberDatabase.clear();
lastMessageId=null;
}
-
- public SubscriptionInfo[] getAllSubscriptions() throws IOException {
- return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+ return (SubscriptionInfo[])subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
+
public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
- // the message table is a synchronizedMap - so just have to synchronize here
- boolean matchFound = false;
- synchronized(messageTable){
- for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry) iter.next();
- if(!matchFound && entry.getKey().equals(id)){
- matchFound = true;
- }else if (matchFound) {
- Message msg = (Message) entry.getValue();
- return msg.getMessageId();
- }
- }
- }
return null;
}
-
- public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
- // the message table is a synchronizedMap - so just have to synchronize here
- Message last= null;
- synchronized(messageTable){
- for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry) iter.next();
-
- if(entry.getKey().equals(id)){
- return last != null ? last.getMessageId() : null;
- }else {
- last = (Message)entry.getValue();
- }
- }
- }
+
+ public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
+ throws IOException{
return null;
}
+
public int getMessageCount(String clientId,String subscriberName) throws IOException{
- int result = 0;
- MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
+ int result=0;
+ MessageId lastAck=(MessageId)ackDatabase.get(new SubscriptionKey(clientId,subscriberName));
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
- result = messageTable.size();
+ result=messageTable.size();
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
- Map.Entry entry=(Entry) iter.next();
+ Map.Entry entry=(Entry)iter.next();
if(entry.getKey().equals(lastAck)){
break;
}
@@ -205,8 +153,14 @@
}
return result;
}
-
- public void resetBatching(String clientId,String subscriptionName,MessageId id) {
+
+ public void resetBatching(String clientId,String subscriptionName,MessageId id){
+ }
+
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
+ MessageRecoveryListener listener) throws Exception{
+ }
+
+ public void resetBatching(String clientId,String subscriptionName){
}
-
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri Nov 17 02:33:57 2006
@@ -287,4 +287,16 @@
}
}
-}
+
+
+ public int getMessageCount(){
+ return 0;
+ }
+
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
+ }
+
+ public void resetBatching(){
+ }
+
+}
\ No newline at end of file
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=476101&r1=476100&r2=476101
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Nov 17 02:33:57 2006
@@ -20,6 +20,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
@@ -38,8 +40,7 @@
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+
/**
* A MessageStore that uses a Journal to store it's messages.
@@ -312,44 +313,6 @@
subscriberAcks.put(key,container);
}
- public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
- throws IOException{
- MessageId result=null;
- boolean getNext=false;
- String key=getSubscriptionKey(clientId,subscriptionName);
- ListContainer list=(ListContainer)subscriberAcks.get(key);
- Iterator iter=list.iterator();
- for(Iterator i=list.iterator();i.hasNext();){
- String id=i.next().toString();
- if(id.equals(messageId.toString())){
- getNext=true;
- }else if(getNext){
- result=new MessageId(id);
- break;
- }
- }
- return result;
- }
-
- public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
- throws IOException{
- MessageId result=null;
- String previousId=null;
- String key=getSubscriptionKey(clientId,subscriptionName);
- ListContainer list=(ListContainer)subscriberAcks.get(key);
- Iterator iter=list.iterator();
- for(Iterator i=list.iterator();i.hasNext();){
- String id=i.next().toString();
- if(id.equals(messageId.toString())){
- if(previousId!=null){
- result=new MessageId(previousId);
- }
- break;
- }
- previousId=id;
- }
- return result;
- }
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
@@ -359,4 +322,16 @@
public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
}
-}
+
+
+ public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,MessageRecoveryListener listener) throws Exception{
+
+
+ }
+
+
+ public void resetBatching(String clientId,String subscriptionName){
+
+
+ }
+}
\ No newline at end of file