You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/08/06 15:25:29 UTC

svn commit: r683259 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/group/ test/java/org/apache/activemq/group/

Author: rajdavies
Date: Wed Aug  6 06:25:27 2008
New Revision: 683259

URL: http://svn.apache.org/viewvc?rev=683259&view=rev
Log:
Ensure ordered access to the group and add expiration to state in 
the GroupMap

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java
      - copied, changed from r679849, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java   (with props)
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java Wed Aug  6 06:25:27 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Return information about map update
+ * 
+ */
+public class AsyncMapRequest implements RequestCallback{
+    private final Object mutex = new Object();
+    
+    private Set<String> requests = new HashSet<String>();
+
+    public void add(String id, MapRequest request) {
+        request.setCallback(this);
+        this.requests.add(id);
+    }
+    
+    /**
+     * Wait for requests
+     * @param timeout
+     * @return
+     */
+    public boolean isSuccess(long timeout) {
+        long deadline = System.currentTimeMillis() + timeout;
+        while (!this.requests.isEmpty()) {
+            synchronized (this.mutex) {
+                try {
+                    this.mutex.wait(timeout);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+        }
+        return this.requests.isEmpty();
+    }
+
+    
+    public void finished(String id) {
+        this.requests.remove(id);
+        
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java Wed Aug  6 06:25:27 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+/**
+ * Default implementation of a MapChangedListener
+ *
+ */
+public class DefaultMapChangedListener implements MapChangedListener{
+
+    public void mapInsert(Member owner, Object key, Object value) {        
+    }
+
+    public void mapRemove(Member owner, Object key, Object value,boolean expired) {        
+    }
+
+    public void mapUpdate(Member owner, Object Key, Object oldValue,Object newValue) {        
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java Wed Aug  6 06:25:27 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Used to pass information around
+ *
+ */
+public class ElectionMessage implements Externalizable{
+    static enum MessageType{ELECTION,ANSWER,COORDINATOR};
+    private Member member;
+    private MessageType type;
+    
+    /**
+     * @return the member
+     */
+    public Member getMember() {
+        return this.member;
+    }
+
+    /**
+     * @param member the member to set
+     */
+    public void setMember(Member member) {
+        this.member = member;
+    }
+
+    /**
+     * @return the type
+     */
+    public MessageType getType() {
+        return this.type;
+    }
+
+    /**
+     * @param type the type to set
+     */
+    public void setType(MessageType type) {
+        this.type = type;
+    }
+    
+    /**
+     * @return true if election message
+     */
+    public boolean isElection() {
+        return this.type != null && this.type.equals(MessageType.ELECTION);
+    }
+    
+    /**
+     * @return true if answer message
+     */
+    public boolean isAnswer() {
+        return this.type != null && this.type.equals(MessageType.ANSWER);
+    }
+    
+    /**
+     * @return true if coordinator message
+     */
+    public boolean isCoordinator() {
+        return this.type != null && this.type.equals(MessageType.COORDINATOR);
+    }
+    
+        
+    public ElectionMessage copy() {
+        ElectionMessage result = new ElectionMessage();
+        result.member=this.member;
+        result.type=this.type;
+        return result;
+    }
+    
+    
+    public void readExternal(ObjectInput in) throws IOException,
+            ClassNotFoundException {
+        this.member=(Member) in.readObject();
+        this.type=(MessageType) in.readObject();
+    }
+    
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(this.member);
+        out.writeObject(this.type);
+    }
+    
+    public String toString() {
+        return "ElectionMessage: "+ this.member + "{"+this.type+ "}";
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java Wed Aug  6 06:25:27 2008
@@ -30,6 +30,7 @@
     private K key;
     private boolean share;
     private boolean removeOnExit;
+    private long expiration;
 
     /**
      * Default constructor - for serialization
@@ -88,6 +89,38 @@
     public void setRemoveOnExit(boolean removeOnExit) {
         this.removeOnExit = removeOnExit;
     }
+    
+    /**
+     * @return the expiration
+     */
+    public long getExpiration() {
+        return expiration;
+    }
+
+    /**
+     * @param expiration the expiration to set
+     */
+    public void setExpiration(long expiration) {
+        this.expiration = expiration;
+    }
+    
+    void setTimeToLive(long ttl) {
+        if (ttl > 0 ) {
+            this.expiration=ttl+System.currentTimeMillis();
+        }else {
+            this.expiration =0l;
+        }
+    }
+    
+    boolean isExpired() {
+        return isExpired(System.currentTimeMillis());
+    }
+    
+    boolean isExpired(long currentTime) {
+        return this.expiration > 0 && this.expiration < currentTime;
+    }
+    
+   
 
     public boolean equals(Object obj) {
         boolean result = false;
@@ -103,6 +136,7 @@
         out.writeObject(this.key);
         out.writeBoolean(isShare());
         out.writeBoolean(isRemoveOnExit());
+        out.writeLong(getExpiration());
     }
 
     public void readExternal(ObjectInput in) throws IOException,
@@ -111,5 +145,10 @@
         this.key = (K) in.readObject();
         this.share = in.readBoolean();
         this.removeOnExit=in.readBoolean();
+        this.expiration=in.readLong();
+    }
+    
+    public String toString() {
+        return "key:"+this.key;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java Wed Aug  6 06:25:27 2008
@@ -26,19 +26,21 @@
  *
  */
 public class EntryMessage implements Externalizable{
-    static enum MessageType{INSERT,DELETE};
+    static enum MessageType{INSERT,DELETE,SYNC};
     private EntryKey key;
     private Object value;
     private MessageType type;
+    private boolean mapUpdate;
+    private boolean expired;
     
     /**
      * @return the owner
      */
     public EntryKey getKey() {
-        return key;
+        return this.key;
     }
     /**
-     * @param owner the owner to set
+     * @param key
      */
     public void setKey(EntryKey key) {
         this.key = key;
@@ -47,7 +49,7 @@
      * @return the value
      */
     public Object getValue() {
-        return value;
+        return this.value;
     }
     /**
      * @param value the value to set
@@ -60,7 +62,7 @@
      * @return the type
      */
     public MessageType getType() {
-        return type;
+        return this.type;
     }
     /**
      * @param type the type to set
@@ -69,18 +71,81 @@
         this.type = type;
     }
     
+    /**
+     * @return the mapUpdate
+     */
+    public boolean isMapUpdate() {
+        return this.mapUpdate;
+    }
+    /**
+     * @param mapUpdate the mapUpdate to set
+     */
+    public void setMapUpdate(boolean mapUpdate) {
+        this.mapUpdate = mapUpdate;
+    }
+    
+    /**
+     * @return the expired
+     */
+    public boolean isExpired() {
+        return expired;
+    }
+    /**
+     * @param expired the expired to set
+     */
+    public void setExpired(boolean expired) {
+        this.expired = expired;
+    }
+    
+    /**
+     * @return if insert message
+     */
+    public boolean isInsert() {
+        return this.type != null && this.type.equals(MessageType.INSERT);
+    }
+    
+    /**
+     * @return true if delete message
+     */
+    public boolean isDelete() {
+        return this.type != null && this.type.equals(MessageType.DELETE);
+    }
+    
+    public boolean isSync() {
+        return this.type != null && this.type.equals(MessageType.SYNC);
+    }
+    
+    public EntryMessage copy() {
+        EntryMessage result = new EntryMessage();
+        result.key=this.key;
+        result.value=this.value;
+        result.type=this.type;
+        result.mapUpdate=this.mapUpdate;
+        result.expired=this.expired;
+        return result;
+    }
+    
     
     
     public void readExternal(ObjectInput in) throws IOException,
             ClassNotFoundException {
         this.key=(EntryKey) in.readObject();
         this.value=in.readObject();
-        this.type=(MessageType) in.readObject();        
+        this.type=(MessageType) in.readObject();  
+        this.mapUpdate=in.readBoolean();
+        this.expired=in.readBoolean();
     }
     
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(this.key);
         out.writeObject(this.value);
         out.writeObject(this.type);
+        out.writeBoolean(this.mapUpdate);
+        out.writeBoolean(this.expired);
+    }
+    
+    public String toString() {
+        return "EntryMessage: "+this.type + "[" + this.key + "," + this.value +
+            "]{update=" + this.mapUpdate + "}";
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java Wed Aug  6 06:25:27 2008
@@ -45,6 +45,14 @@
         return this.value;
     }
     
+    /**
+     * set the value
+     * @param value
+     */
+    public void setValue(V value) {
+        this.value=value;
+    }
+    
     public int hashCode() {
         return this.value != null ? this.value.hashCode() : super.hashCode();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java Wed Aug  6 06:25:27 2008
@@ -27,6 +27,9 @@
 import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -46,40 +49,60 @@
 import org.apache.activemq.advisory.ConsumerListener;
 import org.apache.activemq.thread.SchedulerTimerTask;
 import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.LRUSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
-
 /**
  * <P>
- * A <CODE>GroupMap</CODE> is used to shared state amongst a distributed
- * group. You can restrict ownership of objects inserted into the map,
- * by allowing only the map that inserted the objects to update or remove them
+ * A <CODE>GroupMap</CODE> is a Map implementation that is used to shared state 
+ * amongst a distributed group of other <CODE>GroupMap</CODE> instances. 
+ * Membership of a group is handled automatically using discovery.
+ * <P>
+ * The underlying transport is JMS and there are some optimizations that occur
+ * for membership if used with ActiveMQ - but <CODE>GroupMap</CODE> can be used
+ * with any JMS implementation.
+ * 
+ * <P>
+ * Updates to the group shared map are controlled by a coordinator. The
+ * coordinator is elected by the member with the lowest lexicographical id - 
+ * based on the bully algorithm [Silberschatz et al. 1993]
+ * <P>
+ * The {@link #selectCordinator(Collection<Member> members)} method may be
+ * overridden to implement a custom mechanism for choosing  how the coordinator
+ * is elected for the map.
  * <P>
- * Updates to the group shared map are controlled by a co-ordinator.
- * The co-ordinator is chosen by the member with the lowest lexicographical id .
- * <P>The {@link #selectCordinator(Collection<Member> members)} method may be overridden to 
- * implement a custom mechanism for choosing the co-ordinator
- * are added to the map.
+ * New <CODE>GroupMap</CODE> instances have their state updated by the coordinator,
+ * and coordinator failure is handled automatically within the group.
  * <P>
+ * All updates are totally ordered through the coordinator, whilst read operations 
+ * happen locally. 
+ * <P>
+ * A <CODE>GroupMap</CODE>supports the concept of owner only updates(write locks), 
+ * shared updates, entry expiration times and removal on owner exit - 
+ * all of which are optional.
+ * 
+ * <P>
+ * 
  * @param <K> the key type
  * @param <V> the value type
- *
+ * 
  */
-public class GroupMap<K, V> implements Map<K, V>,Service{
+public class GroupMap<K, V> implements Map<K, V>, Service {
+    /**
+     * default interval within which to detect a member failure
+     */
+    public static final long DEFAULT_HEART_BEAT_INTERVAL = 2000;
+    private static final long EXPIRATION_SWEEP_INTERVAL = 1000;
     private static final Log LOG = LogFactory.getLog(GroupMap.class);
-    private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()+".";
-    private static final int HEART_BEAT_INTERVAL = 15000; 
+    private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()
+            + ".";
     private final Object mapMutex = new Object();
-    private Map<EntryKey<K>,EntryValue<V>> localMap;
-    private Map<String,Member> members = new ConcurrentHashMap<String,Member>();
-    private Map<String,MapRequest> requests = new HashMap<String,MapRequest>();
-    private List <MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
-    private List <MapChangedListener> mapChangedListeners = new CopyOnWriteArrayList<MapChangedListener>();
-    private LRUSet<Message>mapUpdateReplies = new LRUSet<Message>();
-    private Member local;
+    private Map<EntryKey<K>, EntryValue<V>> localMap;
+    private Map<String, Member> members = new ConcurrentHashMap<String, Member>();
+    private Map<String, MapRequest> requests = new HashMap<String, MapRequest>();
+    private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
+    private List<MapChangedListener> mapChangedListeners = new CopyOnWriteArrayList<MapChangedListener>();
+    Member local;
     private Member coordinator;
     private String groupName;
     private boolean sharedWrites;
@@ -93,93 +116,122 @@
     private AtomicBoolean started = new AtomicBoolean();
     private SchedulerTimerTask heartBeatTask;
     private SchedulerTimerTask checkMembershipTask;
-    private Timer heartBeatTimer;
-    private int heartBeatInterval = HEART_BEAT_INTERVAL;
-    private IdGenerator requestGenerator = new IdGenerator();
+    private SchedulerTimerTask expirationTask;
+    private Timer timer;
+    private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL;
+    private IdGenerator idGenerator = new IdGenerator();
     private boolean removeOwnedObjectsOnExit;
-    
+    private int timeToLive;
+    private int minimumGroupSize = 1;
+    private final AtomicBoolean electionFinished = new AtomicBoolean(true);
+    private ExecutorService executor;
+    private final Object memberMutex = new Object();
+
     /**
      * @param connection
      * @param name
      */
-    public GroupMap(Connection connection,String name) {
-        this(connection,"default",name);
+    public GroupMap(Connection connection, String name) {
+        this(connection, "default", name);
     }
-    
+
     /**
      * @param connection
      * @param groupName
      * @param name
      */
-    public GroupMap(Connection connection,String groupName,String name) {
+    public GroupMap(Connection connection, String groupName, String name) {
         this.connection = connection;
         this.local = new Member(name);
-        this.coordinator=this.local;
-        this.groupName=groupName;
+        this.coordinator = this.local;
+        this.groupName = groupName;
     }
-    
+
     /**
-     * Set the local map implementation to be used
-     * By default its a HashMap - but you could use a Cache for example
+     * Set the local map implementation to be used By default its a HashMap -
+     * but you could use a Cache for example
+     * 
      * @param map
      */
     public void setLocalMap(Map map) {
-        synchronized(this.mapMutex) {
-            this.localMap=map;
+        synchronized (this.mapMutex) {
+            this.localMap = map;
         }
     }
-    
+
     /**
      * Start membership to the group
-     * @throws Exception 
+     * 
+     * @throws Exception
      * 
      */
     public void start() throws Exception {
-        if(this.started.compareAndSet(false, true)) {
-            synchronized(this.mapMutex) {
-                if (this.localMap==null) {
-                    this.localMap= new HashMap<EntryKey<K>, EntryValue<V>>();
+        if (this.started.compareAndSet(false, true)) {
+            synchronized (this.mapMutex) {
+                if (this.localMap == null) {
+                    this.localMap = new HashMap<EntryKey<K>, EntryValue<V>>();
                 }
             }
             this.connection.start();
-            this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            this.session = this.connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
             this.producer = this.session.createProducer(null);
             this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
             this.inboxTopic = this.session.createTemporaryTopic();
-            String topicName = STATE_TOPIC_PREFIX+this.groupName;
+            String topicName = STATE_TOPIC_PREFIX + this.groupName;
             this.topic = this.session.createTopic(topicName);
-            this.heartBeatTopic = this.session.createTopic(topicName+".heartbeat");
-            MessageConsumer privateInbox = this.session.createConsumer(this.inboxTopic);
-            privateInbox.setMessageListener(new MessageListener(){
+            this.heartBeatTopic = this.session.createTopic(topicName
+                    + ".heartbeat");
+            MessageConsumer privateInbox = this.session
+                    .createConsumer(this.inboxTopic);
+            privateInbox.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
-                    handleResponses(message);
+                    processMessage(message);
                 }
             });
-            ActiveMQMessageConsumer mapChangeConsumer = (ActiveMQMessageConsumer) this.session.createConsumer(this.topic);
-            mapChangeConsumer.setMessageListener(new MessageListener(){
+            MessageConsumer mapChangeConsumer = this.session
+                    .createConsumer(this.topic);
+            mapChangeConsumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
-                   handleMapUpdates(message);
+                    processMessage(message);
                 }
             });
-            
-            MessageConsumer heartBeatConsumer = this.session.createConsumer(this.heartBeatTopic);
+            MessageConsumer heartBeatConsumer = this.session
+                    .createConsumer(this.heartBeatTopic);
             heartBeatConsumer.setMessageListener(new MessageListener() {
                 public void onMessage(Message message) {
                     handleHeartbeats(message);
-                 }
+                }
             });
-           
-            this.consumerEvents = new ConsumerEventSource(this.connection, this.topic);
+            this.consumerEvents = new ConsumerEventSource(this.connection,
+                    this.topic);
             this.consumerEvents.setConsumerListener(new ConsumerListener() {
                 public void onConsumerEvent(ConsumerEvent event) {
                     handleConsumerEvents(event);
-                }  
+                }
             });
             this.consumerEvents.start();
-            this.local.setId(mapChangeConsumer.getConsumerId().toString());
+            String memberId = null;
+            if (mapChangeConsumer instanceof ActiveMQMessageConsumer) {
+                memberId = ((ActiveMQMessageConsumer) mapChangeConsumer)
+                        .getConsumerId().toString();
+            } else {
+                memberId = this.idGenerator.generateId();
+            }
+            this.local.setId(memberId);
             this.local.setInBoxDestination(this.inboxTopic);
+            this.executor = Executors
+                    .newSingleThreadExecutor(new ThreadFactory() {
+                        public Thread newThread(Runnable runnable) {
+                            Thread thread = new Thread(runnable, "Election{"
+                                    + GroupMap.this.local + "}");
+                            thread.setDaemon(true);
+                            thread.setPriority(Thread.NORM_PRIORITY);
+                            return thread;
+                        }
+                    });
             sendHeartBeat();
-            this.heartBeatTask = new SchedulerTimerTask (new Runnable() {
+            this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
                 public void run() {
                     sendHeartBeat();
                 }
@@ -189,33 +241,76 @@
                     checkMembership();
                 }
             });
-            this.heartBeatTimer = new Timer("Distributed heart beat",true);
-            this.heartBeatTimer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval()/3, getHeartBeatInterval()/2);
             
+            this.expirationTask = new SchedulerTimerTask(new Runnable() {
+                public void run() {
+                    expirationSweep();
+                }
+            });
+            this.timer = new Timer("Distributed heart beat", true);
+            this.timer.scheduleAtFixedRate(this.heartBeatTask,
+                    getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
+            this.timer.scheduleAtFixedRate(this.checkMembershipTask,
+                    getHeartBeatInterval(), getHeartBeatInterval());
+            this.timer.scheduleAtFixedRate(this.expirationTask,
+                    EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
+            // await for members to join
+            long timeout = this.heartBeatInterval * this.minimumGroupSize;
+            long deadline = System.currentTimeMillis() + timeout;
+            while (this.members.size() < this.minimumGroupSize && timeout > 0) {
+                synchronized (this.memberMutex) {
+                    this.memberMutex.wait(timeout);
+                }
+                timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+            }
         }
     }
 
     /**
      * stop membership to the group
-     * @throws Exception 
+     * 
+     * @throws Exception
      */
-    public void stop() throws Exception {
+    public void stop() {
         if (this.started.compareAndSet(true, false)) {
+            this.expirationTask.cancel();
             this.checkMembershipTask.cancel();
             this.heartBeatTask.cancel();
-            this.heartBeatTimer.purge();
-            this.consumerEvents.stop();
-            this.session.close();
+            this.expirationTask.cancel();
+            this.timer.purge();
+            if (this.executor != null) {
+                this.executor.shutdownNow();
+            }
+            try {
+                this.consumerEvents.stop();
+                this.session.close();
+            } catch (Exception e) {
+                LOG.debug("Caught exception stopping", e);
+            }
         }
     }
-    
+
+    /**
+     * @return true if there is elections have finished
+     */
+    public boolean isElectionFinished() {
+        return this.electionFinished.get();
+    }
+
     /**
      * @return the partitionName
      */
     public String getGroupName() {
         return this.groupName;
     }
-    
+
+    /**
+     * @return the name ofthis map
+     */
+    public String getName() {
+        return this.local.getName();
+    }
+
     /**
      * @return the sharedWrites
      */
@@ -224,112 +319,146 @@
     }
 
     /**
-     * @param sharedWrites the sharedWrites to set
+     * @param sharedWrites
+     *            the sharedWrites to set
      */
     public void setSharedWrites(boolean sharedWrites) {
         this.sharedWrites = sharedWrites;
     }
-    
+
     /**
      * @return the heartBeatInterval
      */
-    public int getHeartBeatInterval() {
+    public long getHeartBeatInterval() {
         return this.heartBeatInterval;
     }
 
     /**
-     * @param heartBeatInterval the heartBeatInterval to set
+     * @param heartBeatInterval
+     *            the heartBeatInterval to set
      */
-    public void setHeartBeatInterval(int heartBeatInterval) {
+    public void setHeartBeatInterval(long heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
     }
-    
-    
+
     /**
      * @param l
      */
     public void addMemberChangedListener(MemberChangedListener l) {
         this.membershipListeners.add(l);
     }
-    
+
     /**
      * @param l
      */
     public void removeMemberChangedListener(MemberChangedListener l) {
         this.membershipListeners.remove(l);
     }
-    
+
     /**
      * @param l
      */
     public void addMapChangedListener(MapChangedListener l) {
         this.mapChangedListeners.add(l);
     }
-    
+
     /**
      * @param l
      */
     public void removeMapChangedListener(MapChangedListener l) {
         this.mapChangedListeners.remove(l);
     }
-    
+
+    /**
+     * @return the timeToLive
+     */
+    public int getTimeToLive() {
+        return this.timeToLive;
+    }
+
+    /**
+     * @param timeToLive
+     *            the timeToLive to set
+     */
+    public void setTimeToLive(int timeToLive) {
+        this.timeToLive = timeToLive;
+    }
+
     /**
      * @return the removeOwnedObjectsOnExit
      */
     public boolean isRemoveOwnedObjectsOnExit() {
-        return removeOwnedObjectsOnExit;
+        return this.removeOwnedObjectsOnExit;
     }
 
     /**
-     * Sets the policy for owned objects in the group
-     * If set to true, when this <code>GroupMap<code> stops,
+     * Sets the policy for owned objects in the group If set to true, when this
+     * <code>GroupMap<code> stops,
      * any objects it owns will be removed from the group map
      * @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
      */
     public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
         this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
     }
-   
+
+    /**
+     * @return the minimumGroupSize
+     */
+    public int getMinimumGroupSize() {
+        return this.minimumGroupSize;
+    }
+
+    /**
+     * @param minimumGroupSize
+     *            the minimumGroupSize to set
+     */
+    public void setMinimumGroupSize(int minimumGroupSize) {
+        this.minimumGroupSize = minimumGroupSize;
+    }
+
     /**
      * clear entries from the Map
-     * @throws IllegalStateException 
+     * 
+     * @throws IllegalStateException
      */
     public void clear() throws IllegalStateException {
-        checkStarted();
-        if(this.localMap != null && !this.localMap.isEmpty()) {
+        checkStatus();
+        if (this.localMap != null && !this.localMap.isEmpty()) {
             Set<EntryKey<K>> keys = null;
-            synchronized(this.mapMutex) {
+            synchronized (this.mapMutex) {
                 keys = new HashSet<EntryKey<K>>(this.localMap.keySet());
-                this.localMap.clear();
             }
-            
-            for(EntryKey<K>key:keys) {
+            for (EntryKey<K> key : keys) {
                 remove(key);
             }
-        }        
+        }
+        this.localMap.clear();
     }
-    
-    
+
     public boolean containsKey(Object key) {
-        EntryKey stateKey = new EntryKey(this.local,key);
-        synchronized(this.mapMutex) {
-            return this.localMap != null ? this.localMap.containsKey(stateKey):false;
+        EntryKey stateKey = new EntryKey(this.local, key);
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap.containsKey(stateKey)
+                    : false;
         }
     }
 
     public boolean containsValue(Object value) {
-        EntryValue entryValue = new EntryValue(this.local,value);
-        synchronized(this.mapMutex) {
-            return this.localMap != null ? this.localMap.containsValue(entryValue):false;
+        EntryValue entryValue = new EntryValue(this.local, value);
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap
+                    .containsValue(entryValue) : false;
         }
     }
 
     public Set<java.util.Map.Entry<K, V>> entrySet() {
-        Map<K,V>result = new HashMap<K,V>();
-        synchronized(this.mapMutex) {
-            if(this.localMap!=null) {
-                for(java.util.Map.Entry<EntryKey<K>,EntryValue<V>>entry:this.localMap.entrySet()) {
-                    result.put(entry.getKey().getKey(),entry.getValue().getValue());
+        Map<K, V> result = new HashMap<K, V>();
+        synchronized (this.mapMutex) {
+            if (this.localMap != null) {
+                for (java.util.Map.Entry<EntryKey<K>, EntryValue<V>> entry : this.localMap
+                        .entrySet()) {
+                    result.put(entry.getKey().getKey(), entry.getValue()
+                            .getValue());
                 }
             }
         }
@@ -337,150 +466,170 @@
     }
 
     public V get(Object key) {
-       EntryKey<K> stateKey = new EntryKey<K>(this.local,(K) key);
-       EntryValue<V> value = null;
-       synchronized(this.mapMutex) {
-           value = this.localMap != null ?this.localMap.get(stateKey):null;
-       }
-       return value != null ? value.getValue() : null;
+        EntryKey<K> stateKey = new EntryKey<K>(this.local, (K) key);
+        EntryValue<V> value = null;
+        synchronized (this.mapMutex) {
+            value = this.localMap != null ? this.localMap.get(stateKey) : null;
+        }
+        return value != null ? value.getValue() : null;
     }
 
     public boolean isEmpty() {
-        synchronized(this.mapMutex) {
-            return this.localMap != null ? this.localMap.isEmpty():true;
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap.isEmpty() : true;
         }
     }
 
     public Set<K> keySet() {
-        Set <K>result = new HashSet<K>();
-        synchronized(this.mapMutex) {
-            if(this.localMap!=null) {
-                for (EntryKey<K> key:this.localMap.keySet()) {
+        Set<K> result = new HashSet<K>();
+        synchronized (this.mapMutex) {
+            if (this.localMap != null) {
+                for (EntryKey<K> key : this.localMap.keySet()) {
                     result.add(key.getKey());
                 }
             }
         }
         return result;
     }
+
     /**
      * Puts an value into the map associated with the key
-     * @param key 
-     * @param value 
+     * 
+     * @param key
+     * @param value
      * @return the old value or null
-     * @throws IllegalAccessException 
-     * @throws IllegalStateException 
+     * @throws GroupMapUpdateException
+     * @throws IllegalStateException
      * 
      */
-    public V put(K key, V value) throws IllegalAccessException,IllegalStateException{
-        return put(key,value,isSharedWrites(),isRemoveOwnedObjectsOnExit());
+    public V put(K key, V value) throws GroupMapUpdateException,
+            IllegalStateException {
+        return put(key, value, isSharedWrites(), isRemoveOwnedObjectsOnExit(),
+                getTimeToLive());
     }
-    
+
     /**
      * Puts an value into the map associated with the key
-     * @param key 
-     * @param value 
-     * @param sharedWrites 
-     * @param removeOnExit 
+     * 
+     * @param key
+     * @param value
+     * @param sharedWrites
+     * @param removeOnExit
+     * @param timeToLive
      * @return the old value or null
-     * @throws IllegalAccessException 
-     * @throws IllegalStateException 
+     * @throws GroupMapUpdateException
+     * @throws IllegalStateException
      * 
      */
-    public V put(K key, V value,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException, IllegalStateException{
-        checkStarted();
-        EntryKey<K>entryKey = new EntryKey<K>(this.local,key);
-        EntryValue<V>stateValue = new EntryValue<V>(this.local,value);
+    public V put(K key, V value, boolean sharedWrites, boolean removeOnExit,
+            long timeToLive) throws GroupMapUpdateException,
+            IllegalStateException {
+        checkStatus();
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
+        EntryValue<V> stateValue = new EntryValue<V>(this.local, value);
         entryKey.setShare(sharedWrites);
         entryKey.setRemoveOnExit(removeOnExit);
+        entryKey.setTimeToLive(timeToLive);
         EntryMessage entryMsg = new EntryMessage();
         entryMsg.setKey(entryKey);
         entryMsg.setValue(value);
         entryMsg.setType(EntryMessage.MessageType.INSERT);
-        return sendEntryMessage(entryMsg);
+        return (V) sendRequest(getCoordinator(), entryMsg);
     }
-    
+
     /**
      * Add the Map to the distribution
+     * 
      * @param t
-     * @throws IllegalAccessException
+     * @throws GroupMapUpdateException
      * @throws IllegalStateException
      */
-    public void putAll(Map<? extends K, ? extends V> t) throws IllegalAccessException,IllegalStateException {
-        putAll(t,isSharedWrites(),isRemoveOwnedObjectsOnExit());
+    public void putAll(Map<? extends K, ? extends V> t)
+            throws GroupMapUpdateException, IllegalStateException {
+        putAll(t, isSharedWrites(), isRemoveOwnedObjectsOnExit(),
+                getTimeToLive());
     }
 
     /**
      * Add the Map to the distribution
+     * 
      * @param t
      * @param sharedWrites
      * @param removeOnExit
-     * @throws IllegalAccessException
+     * @param timeToLive
+     * @throws GroupMapUpdateException
      * @throws IllegalStateException
      */
-    public void putAll(Map<? extends K, ? extends V> t,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException,IllegalStateException {
-        for(java.util.Map.Entry<? extends K, ? extends V>entry:t.entrySet()) {
-            put(entry.getKey(),entry.getValue(),sharedWrites,removeOnExit);
-        }        
+    public void putAll(Map<? extends K, ? extends V> t, boolean sharedWrites,
+            boolean removeOnExit, long timeToLive)
+            throws GroupMapUpdateException, IllegalStateException {
+        for (java.util.Map.Entry<? extends K, ? extends V> entry : t.entrySet()) {
+            put(entry.getKey(), entry.getValue(), sharedWrites, removeOnExit,
+                    timeToLive);
+        }
     }
 
     /**
      * remove a value from the map associated with the key
-     * @param key 
+     * 
+     * @param key
      * @return the Value or null
-     * @throws IllegalAccessException 
-     * @throws IllegalStateException 
+     * @throws GroupMapUpdateException
+     * @throws IllegalStateException
      * 
      */
-    public V remove(Object key) throws IllegalAccessException,IllegalStateException{
-        EntryKey<K> entryKey = new EntryKey<K>(this.local,(K) key);
+    public V remove(Object key) throws GroupMapUpdateException,
+            IllegalStateException {
+        EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
         return remove(entryKey);
     }
-    
-    V remove(EntryKey<K> key) throws IllegalAccessException,IllegalStateException{
-        checkStarted();
+
+    V remove(EntryKey<K> key) throws GroupMapUpdateException,
+            IllegalStateException {
+        checkStatus();
         EntryMessage entryMsg = new EntryMessage();
         entryMsg.setKey(key);
         entryMsg.setType(EntryMessage.MessageType.DELETE);
-        return sendEntryMessage(entryMsg);
+        return (V) sendRequest(getCoordinator(), entryMsg);
     }
-    
-       
+
     public int size() {
-        synchronized(this.mapMutex) {
-            return this.localMap != null ? this.localMap.size():0;
+        synchronized (this.mapMutex) {
+            return this.localMap != null ? this.localMap.size() : 0;
         }
     }
 
     public Collection<V> values() {
         List<V> result = new ArrayList<V>();
-        synchronized(this.mapMutex) {
-            if(this.localMap!=null) {
-                for (EntryValue<V> value:this.localMap.values()) {
+        synchronized (this.mapMutex) {
+            if (this.localMap != null) {
+                for (EntryValue<V> value : this.localMap.values()) {
                     result.add(value.getValue());
                 }
             }
         }
         return result;
     }
-    
+
     /**
-     * @return a set of the members 
+     * @return a set of the members
      */
-    public Set<Member> members(){
-        Set<Member>result = new HashSet<Member>();
+    public Set<Member> members() {
+        Set<Member> result = new HashSet<Member>();
         result.addAll(this.members.values());
         return result;
     }
-    
+
     /**
      * @param key
      * @return true if this is the owner of the key
      */
     public boolean isOwner(K key) {
-        EntryKey<K> stateKey = new EntryKey<K>(this.local,key);
+        EntryKey<K> stateKey = new EntryKey<K>(this.local, key);
         EntryValue<V> entryValue = null;
-        synchronized(this.mapMutex) {
-            entryValue = this.localMap != null ? this.localMap.get(stateKey):null;
+        synchronized (this.mapMutex) {
+            entryValue = this.localMap != null ? this.localMap.get(stateKey)
+                    : null;
         }
         boolean result = false;
         if (entryValue != null) {
@@ -488,102 +637,152 @@
         }
         return result;
     }
-    
+
     /**
      * Get the owner of a key
+     * 
      * @param key
      * @return the owner - or null if the key doesn't exist
      */
     public Member getOwner(K key) {
-        EntryKey<K> stateKey = new EntryKey<K>(this.local,key);
+        EntryKey<K> stateKey = new EntryKey<K>(this.local, key);
         EntryValue<V> entryValue = null;
-        synchronized(this.mapMutex) {
-            entryValue = this.localMap != null ? this.localMap.get(stateKey):null;
+        synchronized (this.mapMutex) {
+            entryValue = this.localMap != null ? this.localMap.get(stateKey)
+                    : null;
         }
-        return entryValue != null ? entryValue.getOwner():null;
+        return entryValue != null ? entryValue.getOwner() : null;
     }
-    
+
     /**
      * @return true if the coordinator for the map
      */
     public boolean isCoordinator() {
         return this.local.equals(this.coordinator);
     }
-    
+
+    /**
+     * @return the coordinator
+     */
+    public Member getCoordinator() {
+        return this.coordinator;
+    }
+
     /**
-     * Select a coordinator - by default, its the member with 
-     * the lowest lexicographical id 
+     * Select a coordinator - by default, its the member with the lowest
+     * lexicographical id
+     * 
      * @param members
      * @return
      */
-    protected Member selectCordinator(Collection<Member>members) {
+    protected Member selectCordinator(Collection<Member> members) {
         Member result = this.local;
-        for (Member member:members) {
+        for (Member member : members) {
             if (result.getId().compareTo(member.getId()) < 0) {
                 result = member;
             }
         }
-        return result;   
+        return result;
     }
-    
-    V sendEntryMessage(EntryMessage entry) {
+
+    Object sendRequest(Member member, Serializable payload) {
         Object result = null;
         MapRequest request = new MapRequest();
-        String id = this.requestGenerator.generateId();
-        synchronized(this.requests) {
-            this.requests.put(id,request);
+        String id = this.idGenerator.generateId();
+        synchronized (this.requests) {
+            this.requests.put(id, request);
         }
         try {
-            ObjectMessage objMsg = this.session.createObjectMessage(entry);
+            ObjectMessage objMsg = this.session.createObjectMessage(payload);
             objMsg.setJMSReplyTo(this.inboxTopic);
             objMsg.setJMSCorrelationID(id);
-            this.producer.send(this.topic, objMsg);
-            result = request.get(getHeartBeatInterval()*2);
-        }catch(JMSException e) {
-            if(this.started.get()) {
-                LOG.error("Failed to send EntryMessage " + entry,e);
+            this.producer.send(member.getInBoxDestination(), objMsg);
+            result = request.get(getHeartBeatInterval() * 200000);
+        } catch (JMSException e) {
+            if (this.started.get()) {
+                LOG.error("Failed to send request " + payload, e);
             }
         }
-        if (result instanceof IllegalAccessException) {
-            throw (IllegalAccessException)result;
+        if (result instanceof GroupMapUpdateException) {
+            throw (GroupMapUpdateException) result;
         }
-        return (V) result;
+        return result;
     }
-    
-    void handleResponses(Message message) {
+
+    void sendAsyncRequest(AsyncMapRequest asyncRequest, Member member,
+            Serializable payload) {
+        MapRequest request = new MapRequest();
+        String id = this.idGenerator.generateId();
+        asyncRequest.add(id, request);
+        synchronized (this.requests) {
+            this.requests.put(id, request);
+        }
+        try {
+            ObjectMessage objMsg = this.session.createObjectMessage(payload);
+            objMsg.setJMSReplyTo(this.inboxTopic);
+            objMsg.setJMSCorrelationID(id);
+            this.producer.send(member.getInBoxDestination(), objMsg);
+        } catch (JMSException e) {
+            if (this.started.get()) {
+                LOG.error("Failed to send async request " + payload, e);
+            }
+        }
+    }
+
+    void sendReply(Object reply, Destination replyTo, String id) {
+        try {
+            ObjectMessage replyMsg = this.session
+                    .createObjectMessage((Serializable) reply);
+            replyMsg.setJMSCorrelationID(id);
+            this.producer.send(replyTo, replyMsg);
+        } catch (JMSException e) {
+            LOG.error("Couldn't send reply from co-ordinator", e);
+        }
+    }
+
+    void broadcastMapUpdate(EntryMessage entry, String correlationId) {
+        try {
+            EntryMessage copy = entry.copy();
+            copy.setMapUpdate(true);
+            ObjectMessage objMsg = this.session.createObjectMessage(copy);
+            objMsg.setJMSCorrelationID(correlationId);
+            this.producer.send(this.topic, objMsg);
+        } catch (JMSException e) {
+            if (this.started.get()) {
+                LOG.error("Failed to send EntryMessage " + entry, e);
+            }
+        }
+    }
+
+    void processMessage(Message message) {
         if (message instanceof ObjectMessage) {
             ObjectMessage objMsg = (ObjectMessage) message;
             try {
+                String id = objMsg.getJMSCorrelationID();
+                Destination replyTo = objMsg.getJMSReplyTo();
                 Object payload = objMsg.getObject();
                 if (payload instanceof Member) {
-                    handleHeartbeats((Member)payload);
-                } else if(payload instanceof EntryMessage) {
+                    handleHeartbeats((Member) payload);
+                } else if (payload instanceof EntryMessage) {
                     EntryMessage entryMsg = (EntryMessage) payload;
-                    EntryKey<K>key=entryMsg.getKey();
-                    EntryValue<V> value = new EntryValue<V>(key.getOwner(),(V) entryMsg.getValue());
-                    
-                    if(this.localMap !=null) {
-                        boolean fireUpdate = false;
-                        synchronized(this.mapMutex) {
-                            if(!this.localMap.containsKey(key)) {
-                                this.localMap.put(key,value);
-                                fireUpdate=true;
-                            }
-                        }
-                        if(fireUpdate) {
-                            fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue());
-                        }
+                    entryMsg = entryMsg.copy();
+                    if (entryMsg.isMapUpdate()) {
+                        processMapUpdate(entryMsg);
+                    } else {
+                        processEntryMessage(entryMsg, replyTo, id);
                     }
-                   
-                    
-                }else {
-                    String id = objMsg.getJMSCorrelationID();
+                } else if (payload instanceof ElectionMessage) {
+                    ElectionMessage electionMsg = (ElectionMessage) payload;
+                    electionMsg = electionMsg.copy();
+                    processElectionMessage(electionMsg, replyTo, id);
+                }
+                if (id != null) {
                     MapRequest result = null;
                     synchronized (this.requests) {
                         result = this.requests.remove(id);
                     }
                     if (result != null) {
-                        result.put(objMsg.getObject());
+                        result.put(id, objMsg.getObject());
                     }
                 }
             } catch (JMSException e) {
@@ -591,83 +790,92 @@
             }
         }
     }
-    
-    void handleMapUpdates(Message message) {
-        Object reply = null;
-        if (message instanceof ObjectMessage) {
-            try {
-                ObjectMessage objMsg = (ObjectMessage) message;
-                EntryMessage entryMsg = (EntryMessage) objMsg.getObject();
-                EntryKey<K> key = entryMsg.getKey();
-                EntryValue<V> value = new EntryValue<V>(key.getOwner(),(V) entryMsg.getValue());
-                boolean containsKey=false;
-                boolean mapExists = false;
-                synchronized(this.mapMutex) {
-                    mapExists = this.localMap!=null;
-                    if(mapExists) {
-                        containsKey=this.localMap.containsKey(key);
-                    }
-                }
-                if(mapExists) {
-                    if (containsKey) {
-                        Member owner = getOwner((K) key.getKey());
-                        if (owner.equals(key.getOwner()) && !key.isShare()) {
-                            EntryValue<V> old = null;
-                            if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) {
-                                synchronized(this.mapMutex) {
-                                    old = this.localMap.put(key, value);     
-                                }
-                            }else {
-                                synchronized(this.mapMutex) {
-                                    old = this.localMap.remove(key);
-                                }
-                            }
-                            fireMapChanged(owner, key.getKey(), old.getValue(), value.getValue());
-                        }else {
-                            reply = new IllegalAccessException("Owned by "+ owner);
+
+    void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
+            String correlationId) {
+        if (isCoordinator()) {
+            EntryKey<K> key = entryMsg.getKey();
+            EntryValue<V> value = new EntryValue<V>(key.getOwner(),
+                    (V) entryMsg.getValue());
+            boolean insert = entryMsg.isInsert();
+            boolean containsKey = false;
+            synchronized (this.mapMutex) {
+                containsKey = this.localMap.containsKey(key);
+            }
+            if (containsKey) {
+                Member owner = getOwner((K) key.getKey());
+                if (owner.equals(key.getOwner()) || key.isShare()) {
+                    EntryValue<V> old = null;
+                    if (insert) {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.put(key, value);
                         }
-                    }else {
-                        if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) {
-                            synchronized(this.mapMutex) {
-                                this.localMap.put(key, value);
-                            }
-                            fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue());
+                    } else {
+                        synchronized (this.mapMutex) {
+                            old = this.localMap.remove(key);
                         }
                     }
+                    broadcastMapUpdate(entryMsg, correlationId);
+                    fireMapChanged(owner, key.getKey(), old.getValue(), value
+                            .getValue(), false);
+                } else {
+                    Serializable reply = new GroupMapUpdateException(
+                            "Owned by " + owner);
+                    sendReply(reply, replyTo, correlationId);
+                }
+            } else {
+                if (insert) {
+                    synchronized (this.mapMutex) {
+                        this.localMap.put(key, value);
+                    }
+                    broadcastMapUpdate(entryMsg, correlationId);
+                    fireMapChanged(key.getOwner(), key.getKey(), null, value
+                            .getValue(), false);
+                } else {
+                    sendReply(null, replyTo, correlationId);
                 }
-            } catch (JMSException e) {
-                LOG.warn("Failed to process map update",e);
-                reply = e;
             }
-           
-            try {
-                Destination replyTo = message.getJMSReplyTo();
-                String correlationId = message.getJMSCorrelationID();
-                ObjectMessage replyMsg = this.session
-                        .createObjectMessage((Serializable) reply);
-                replyMsg.setJMSCorrelationID(correlationId);
-                // reuse timestamp - this will be cleared by the producer on
-                // send
-                replyMsg.setJMSTimestamp(System.currentTimeMillis());
-                if (isCoordinator()) {
-                    this.producer.send(replyTo, replyMsg);
-                }else {
-                    synchronized(mapUpdateReplies) {
-                        this.mapUpdateReplies.add(replyMsg);
+        }
+    }
+
+    void processMapUpdate(EntryMessage entryMsg) {
+        boolean containsKey = false;
+        EntryKey<K> key = entryMsg.getKey();
+        EntryValue<V> value = new EntryValue<V>(key.getOwner(), (V) entryMsg
+                .getValue());
+        boolean insert = entryMsg.isInsert()||entryMsg.isSync();
+        synchronized (this.mapMutex) {
+            containsKey = this.localMap.containsKey(key);
+        }
+       
+        if (!isCoordinator()||entryMsg.isSync()) {
+            if (containsKey) {
+                Member owner = getOwner((K) key.getKey());
+                EntryValue<V> old = null;
+                if (insert) {
+                    synchronized (this.mapMutex) {
+                        old = this.localMap.put(key, value);
+                    }
+                } else {
+                    synchronized (this.mapMutex) {
+                        old = this.localMap.remove(key);
+                        value.setValue(null);
                     }
                 }
-            } catch (JMSException e) {
-                if(this.started.get()) {
-                    LOG.error("Failed to send response to a map update ", e);
+                fireMapChanged(owner, key.getKey(), old.getValue(), value
+                        .getValue(), entryMsg.isExpired());
+            } else {
+                if (insert) {
+                    synchronized (this.mapMutex) {
+                        this.localMap.put(key, value);
+                    }
+                    fireMapChanged(key.getOwner(), key.getKey(), null, value
+                            .getValue(), false);
                 }
             }
-            
-            
-        }else {
-            LOG.warn("Unexpected map update message " + message);
         }
     }
-    
+
     void handleHeartbeats(Message message) {
         try {
             if (message instanceof ObjectMessage) {
@@ -685,63 +893,99 @@
     void handleHeartbeats(Member member) {
         member.setTimeStamp(System.currentTimeMillis());
         if (this.members.put(member.getId(), member) == null) {
+            election(member, true);
             fireMemberStarted(member);
             if (!member.equals(this.local)) {
-                //send the new member our details
                 sendHeartBeat(member.getInBoxDestination());
-                if(isCoordinator()) {
-                    updateNewMemberMap(member);
-                }
+            }
+            synchronized (this.memberMutex) {
+                this.memberMutex.notifyAll();
             }
         }
     }
-    
+
     void handleConsumerEvents(ConsumerEvent event) {
         if (!event.isStarted()) {
-            Member member = this.members.remove(event.getConsumerId().toString());
-            if(member!=null) {
+            Member member = this.members.remove(event.getConsumerId()
+                    .toString());
+            if (member != null) {
                 fireMemberStopped(member);
-                doElection();
+                election(member, false);
             }
         }
     }
-    
+
     void checkMembership() {
-        if (this.started.get()) {
-            long checkTime = System.currentTimeMillis()-getHeartBeatInterval();
+        if (this.started.get() && this.electionFinished.get()) {
+            long checkTime = System.currentTimeMillis()
+                    - getHeartBeatInterval();
             boolean doElection = false;
             for (Member member : this.members.values()) {
-                if (member.getTimeStamp()<checkTime) {
+                if (member.getTimeStamp() < checkTime) {
                     LOG.info("Member timestamp expired " + member);
                     this.members.remove(member.getId());
                     fireMemberStopped(member);
-                    doElection=true;
-                    
+                    doElection = true;
                 }
             }
             if (doElection) {
-                doElection();
+                election(null, false);
             }
         }
-        //clear down cached reply messages
-        long checkTime = System.currentTimeMillis()-(getHeartBeatInterval()*2);
-        List<Message> tmpList = new ArrayList<Message>();
-        synchronized(this.mapUpdateReplies) {
-            try {
-                for(Message msg:this.mapUpdateReplies) {
-                    if (msg.getJMSTimestamp() < checkTime) {
-                        tmpList.add(msg);
+    }
+    
+    void expirationSweep() {
+        if (isCoordinator() && this.started.get() && this.electionFinished.get()) {
+            List<EntryKey> list = null;
+            synchronized (this.mapMutex) {
+                Map<EntryKey<K>, EntryValue<V>> map = this.localMap;
+                if (map != null) {
+                    long currentTime = System.currentTimeMillis();
+                    for (EntryKey k : map.keySet()) {
+                        if (k.isExpired(currentTime)) {
+                            if (list == null) {
+                                list = new ArrayList<EntryKey>();
+                                list.add(k);
+                            }
+                        }
                     }
                 }
-                for(Message msg:tmpList) {
-                    this.mapUpdateReplies.remove(msg);
-                }
-            }catch(JMSException e) {
-                LOG.warn("Failed to clear down mapUpdateReplies",e);
+            }
+            //do the actual removal of entries in a separate thread
+            if (list != null) {
+                final List<EntryKey> expire = list;
+                this.executor.execute(new Runnable() {
+                    public void run() {
+                        doExpiration(expire);
+                    }
+                });
             }
         }
+        
     }
     
+    void doExpiration(List<EntryKey> list) {
+        if (this.started.get() && this.electionFinished.get()
+                && isCoordinator()) {
+            for (EntryKey k : list) {
+                EntryValue<V> old = null;
+                synchronized (this.mapMutex) {
+                    old = this.localMap.remove(k);
+                }
+                if (old != null) {
+                    EntryMessage entryMsg = new EntryMessage();
+                    entryMsg.setType(EntryMessage.MessageType.DELETE);
+                    entryMsg.setExpired(true);
+                    entryMsg.setKey(k);
+                    entryMsg.setValue(old.getValue());
+                    broadcastMapUpdate(entryMsg, "");
+                    fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
+                            null, true);
+                }
+            }
+        }
+    }
+
     void sendHeartBeat() {
         sendHeartBeat(this.heartBeatTopic);
     }
@@ -752,14 +996,16 @@
                 ObjectMessage msg = this.session
                         .createObjectMessage(this.local);
                 this.producer.send(destination, msg);
+            } catch (javax.jms.IllegalStateException e) {
+                // ignore - as we are probably stopping
             } catch (Throwable e) {
-                if(this.started.get()) {
+                if (this.started.get()) {
                     LOG.warn("Failed to send heart beat", e);
                 }
             }
         }
     }
-    
+
     void updateNewMemberMap(Member member) {
         List<Map.Entry<EntryKey<K>, EntryValue<V>>> list = new ArrayList<Map.Entry<EntryKey<K>, EntryValue<V>>>();
         synchronized (this.mapMutex) {
@@ -775,10 +1021,11 @@
                 EntryMessage entryMsg = new EntryMessage();
                 entryMsg.setKey(entry.getKey());
                 entryMsg.setValue(entry.getValue().getValue());
-                entryMsg.setType(EntryMessage.MessageType.INSERT);
+                entryMsg.setType(EntryMessage.MessageType.SYNC);
+                entryMsg.setMapUpdate(true);
                 ObjectMessage objMsg = this.session
                         .createObjectMessage(entryMsg);
-                if(!member.equals(entry.getKey().getOwner())) {
+                if (!member.equals(entry.getKey().getOwner())) {
                     this.producer.send(member.getInBoxDestination(), objMsg);
                 }
             }
@@ -788,26 +1035,26 @@
             }
         }
     }
-    
+
     void fireMemberStarted(Member member) {
-        LOG.info(this.local.getName()+" Member started " + member);
+        LOG.info(this.local.getName() + " Member started " + member);
         for (MemberChangedListener l : this.membershipListeners) {
             l.memberStarted(member);
         }
     }
-    
+
     void fireMemberStopped(Member member) {
-        LOG.info(this.local.getName()+" Member stopped " + member);
+        LOG.info(this.local.getName() + " Member stopped " + member);
         for (MemberChangedListener l : this.membershipListeners) {
             l.memberStopped(member);
         }
-        //remove all entries owned by the stopped member
-        List<EntryKey<K>>tmpList = new ArrayList<EntryKey<K>>();
+        // remove all entries owned by the stopped member
+        List<EntryKey<K>> tmpList = new ArrayList<EntryKey<K>>();
         boolean mapExists = false;
-        synchronized(this.mapMutex) {
-            mapExists=this.localMap!=null;
-            if(mapExists) {
-                for (EntryKey<K> entryKey:this.localMap.keySet()) {
+        synchronized (this.mapMutex) {
+            mapExists = this.localMap != null;
+            if (mapExists) {
+                for (EntryKey<K> entryKey : this.localMap.keySet()) {
                     if (entryKey.getOwner().equals(member)) {
                         if (entryKey.isRemoveOnExit()) {
                             tmpList.add(entryKey);
@@ -816,49 +1063,154 @@
                 }
             }
         }
-        if(mapExists) {
-            for (EntryKey<K> entryKey:tmpList) {
+        if (mapExists) {
+            for (EntryKey<K> entryKey : tmpList) {
                 EntryValue<V> value = null;
-                synchronized(this.mapMutex) {
+                synchronized (this.mapMutex) {
                     value = this.localMap.remove(entryKey);
                 }
-                fireMapChanged(member, entryKey.getKey(), value.getValue(), null);
+                fireMapChanged(member, entryKey.getKey(), value.getValue(),
+                        null,false);
             }
         }
     }
-    
-    void fireMapChanged(Member owner,Object key, Object oldValue, Object newValue) {
-        for (MapChangedListener l:this.mapChangedListeners) {
-            l.mapChanged(owner, key, oldValue, newValue);
+
+    void fireMapChanged(final Member owner, final Object key,
+            final Object oldValue, final Object newValue, final boolean expired) {
+        if (this.started.get() && this.executor != null
+                && !this.executor.isShutdown()) {
+            this.executor.execute(new Runnable() {
+                public void run() {
+                    doFireMapChanged(owner, key, oldValue, newValue, expired);
+                }
+            });
         }
     }
-    
-    void doElection() {
-        this.coordinator=selectCordinator(this.members.values());
-        if (isCoordinator() && this.started.get()) {
-            //send any inflight requests
-            List<Message>list = new ArrayList<Message>();
-            synchronized(this.mapUpdateReplies) {
-                list.addAll(this.mapUpdateReplies);
-                this.mapUpdateReplies.clear();
+
+    void doFireMapChanged(Member owner, Object key, Object oldValue,
+            Object newValue, boolean expired) {
+        for (MapChangedListener l : this.mapChangedListeners) {
+            if (oldValue == null) {
+                l.mapInsert(owner, key, newValue);
+            } else if (newValue == null) {
+                l.mapRemove(owner, key, oldValue, expired);
+            } else {
+                l.mapUpdate(owner, key, oldValue, newValue);
             }
-            try {
-            for(Message msg:list) {
-                if (this.started.get()) {
-                    this.producer.send(msg.getJMSReplyTo(), msg);
+        }
+    }
+
+    void election(final Member member, final boolean memberStarted) {
+        if (this.started.get() && this.executor != null
+                && !this.executor.isShutdown()) {
+            this.executor.execute(new Runnable() {
+                public void run() {
+                    doElection(member, memberStarted);
                 }
+            });
+        }
+    }
+
+    void doElection(Member member, boolean memberStarted) {
+        if ((member == null || !member.equals(this.local))
+                && this.electionFinished.compareAndSet(true, false)) {
+            boolean wasCoordinator = isCoordinator() && !isEmpty();
+            // call an election
+            while (!callElection())
+                ;
+            List<Member> members = new ArrayList<Member>(this.members.values());
+            this.coordinator = selectCordinator(members);
+            if (isCoordinator()) {
+                broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
             }
-            }catch(JMSException e) {
-                if(this.started.get()) {
-                    LOG.error("Failed to resend replies",e);
+            if (memberStarted && member != null) {
+                if (wasCoordinator || isCoordinator() && this.started.get()) {
+                    updateNewMemberMap(member);
                 }
             }
+            if (!this.electionFinished.get()) {
+                try {
+                    synchronized (this.electionFinished) {
+                        this.electionFinished.wait(this.heartBeatInterval * 2);
+                    }
+                } catch (InterruptedException e) {
+                }
+            }
+            if (!this.electionFinished.get()) {
+                // we must be the coordinator
+                this.coordinator = this.local;
+                this.electionFinished.set(true);
+                broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
+            }
         }
-    } 
-    
-    void checkStarted() throws IllegalStateException{
+    }
+
+    boolean callElection() {
+        List<Member> members = new ArrayList<Member>(this.members.values());
+        AsyncMapRequest request = new AsyncMapRequest();
+        for (Member member : members) {
+            if (this.local.getId().compareTo(member.getId()) < 0) {
+                ElectionMessage msg = new ElectionMessage();
+                msg.setMember(this.local);
+                msg.setType(ElectionMessage.MessageType.ELECTION);
+                sendAsyncRequest(request, member, msg);
+            }
+        }
+        return request.isSuccess(getHeartBeatInterval());
+    }
+
+    void processElectionMessage(ElectionMessage msg, Destination replyTo,
+            String correlationId) {
+        if (msg.isElection()) {
+            msg.setType(ElectionMessage.MessageType.ANSWER);
+            msg.setMember(this.local);
+            sendReply(msg, replyTo, correlationId);
+        } else if (msg.isCoordinator()) {
+            synchronized (this.electionFinished) {
+                this.coordinator = msg.getMember();
+                this.electionFinished.set(true);
+                this.electionFinished.notifyAll();
+            }
+        }
+    }
+
+    void broadcastElectionType(ElectionMessage.MessageType type) {
+        if (started.get()) {
+            try {
+                ElectionMessage msg = new ElectionMessage();
+                msg.setMember(this.local);
+                msg.setType(type);
+                ObjectMessage objMsg = this.session.createObjectMessage(msg);
+                this.producer.send(this.topic, objMsg);
+            } catch (javax.jms.IllegalStateException e) {
+                // ignore - we are stopping
+            } catch (JMSException e) {
+                if (this.started.get()) {
+                    LOG.error("Failed to broadcast election message: " + type,
+                            e);
+                }
+            }
+        }
+    }
+
+    void checkStatus() throws IllegalStateException {
         if (!started.get()) {
-            throw new IllegalStateException("GroupMap " + this.local.getName() + " not started");
+            throw new IllegalStateException("GroupMap " + this.local.getName()
+                    + " not started");
+        }
+        waitForElection();
+    }
+    
+    void waitForElection() {
+        synchronized (this.electionFinished) {
+            while (started.get() && !this.electionFinished.get()) {
+                try {
+                    this.electionFinished.wait(1000);
+                } catch (InterruptedException e) {
+                    stop();
+                    Thread.currentThread().interrupt();
+                }
+            }
         }
-    }   
+    }
 }

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java (from r679849, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java&r1=679849&r2=683259&rev=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java Wed Aug  6 06:25:27 2008
@@ -20,13 +20,13 @@
  * thrown when updating a key to map that the local client doesn't own
  *
  */
-public class IllegalAccessException extends java.lang.IllegalStateException {
+public class GroupMapUpdateException extends RuntimeException {
     private static final long serialVersionUID = -7584658017201604560L;
     
     /**
      * @param message
      */
-    public IllegalAccessException(String message) {
+    public GroupMapUpdateException(String message) {
         super(message);
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java Wed Aug  6 06:25:27 2008
@@ -23,11 +23,28 @@
 public interface MapChangedListener {
     
     /**
-     * Called when the map has changed
+     * Called when a key/value pair is inserted into the map
      * @param owner 
      * @param key
+     * @param value 
+     */
+    void mapInsert(Member owner,Object key, Object value);
+    
+    /**
+     * Called when a key value is updated in the map
+     * @param owner
+     * @param Key
      * @param oldValue
      * @param newValue
      */
-    void mapChanged(Member owner,Object key, Object oldValue, Object newValue);
+    void mapUpdate(Member owner,Object Key,Object oldValue,Object newValue);
+    
+    /**
+     * Called when a key value is removed from the map
+     * @param owner
+     * @param key
+     * @param value
+     * @param expired
+     */
+    void mapRemove(Member owner,Object key, Object value,boolean expired);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java Wed Aug  6 06:25:27 2008
@@ -25,12 +25,13 @@
 public class MapRequest {
     private final AtomicBoolean done = new AtomicBoolean();
     private Object response;
+    private RequestCallback callback;
 
-    Object get(int timeout) {
-        synchronized (done) {
-            if (done.get() == false && response == null) {
+    Object get(long timeout) {
+        synchronized (this.done) {
+            if (this.done.get() == false && this.response == null) {
                 try {
-                    done.wait(timeout);
+                    this.done.wait(timeout);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                 }
@@ -39,15 +40,23 @@
         return this.response;
     }
 
-    void put(Object response) {
+    void put(String id,Object response) {
         this.response = response;
         cancel();
+        RequestCallback callback = this.callback;
+        if (callback != null) {
+            callback.finished(id);
+        }
     }
 
     void cancel() {
-        done.set(true);
-        synchronized (done) {
-            done.notifyAll();
+        this.done.set(true);
+        synchronized (this.done) {
+            this.done.notifyAll();
         }
     }
+    
+    void setCallback(RequestCallback callback) {
+        this.callback=callback;
+    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java Wed Aug  6 06:25:27 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+
+/**
+ * Return information about map update
+ * 
+ */
+public interface RequestCallback{
+    /**
+     * Optionally called when a request is finished
+     * @param id
+     */
+    void finished(String id);
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java Wed Aug  6 06:25:27 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+public class GroupMapMemberTest extends TestCase {
+    protected BrokerService broker;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+    /**
+     * Test method for
+     * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * @throws Exception 
+     */
+    public void testGroup() throws Exception {
+        
+        int number = 20;
+        List<Connection>connections = new ArrayList<Connection>();
+        List<GroupMap>groupMaps = new ArrayList<GroupMap>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i =0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            connection.start();
+            connections.add(connection);
+            GroupMap map = new GroupMap(connection,"map"+i);
+            map.setHeartBeatInterval(20000);
+            if(i ==number-1) {
+                map.setMinimumGroupSize(number);
+            }
+            map.start();
+            groupMaps.add(map);
+        }
+        
+        int coordinator = 0;
+        for (GroupMap map:groupMaps) {
+            if (map.isCoordinator()) {
+                coordinator++;
+            }
+        }
+               
+        assertEquals(1,coordinator);
+        groupMaps.get(0).put("key", "value");
+        Thread.sleep(2000);
+        for (GroupMap map:groupMaps) {
+            assertTrue(map.get("key").equals("value"));
+        }
+        for(GroupMap map:groupMaps) {
+            map.stop();
+        }
+        for (Connection connection:connections) {
+            connection.stop();
+        }
+        
+    }
+
+    
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}
+

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java
------------------------------------------------------------------------------
    svn:eol-style = native