You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/08/06 15:25:29 UTC
svn commit: r683259 [1/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/group/ test/java/org/apache/activemq/group/
Author: rajdavies
Date: Wed Aug 6 06:25:27 2008
New Revision: 683259
URL: http://svn.apache.org/viewvc?rev=683259&view=rev
Log:
Ensure ordered access to the group and add expiration to state in
the GroupMap
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java
- copied, changed from r679849, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java (with props)
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java Wed Aug 6 06:25:27 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Return information about map update
+ *
+ */
+public class AsyncMapRequest implements RequestCallback{
+ private final Object mutex = new Object();
+
+ private Set<String> requests = new HashSet<String>();
+
+ public void add(String id, MapRequest request) {
+ request.setCallback(this);
+ this.requests.add(id);
+ }
+
+ /**
+ * Wait for requests
+ * @param timeout
+ * @return
+ */
+ public boolean isSuccess(long timeout) {
+ long deadline = System.currentTimeMillis() + timeout;
+ while (!this.requests.isEmpty()) {
+ synchronized (this.mutex) {
+ try {
+ this.mutex.wait(timeout);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+ }
+ return this.requests.isEmpty();
+ }
+
+
+ public void finished(String id) {
+ this.requests.remove(id);
+
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/AsyncMapRequest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java Wed Aug 6 06:25:27 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+/**
+ * Default implementation of a MapChangedListener
+ *
+ */
+public class DefaultMapChangedListener implements MapChangedListener{
+
+ public void mapInsert(Member owner, Object key, Object value) {
+ }
+
+ public void mapRemove(Member owner, Object key, Object value,boolean expired) {
+ }
+
+ public void mapUpdate(Member owner, Object Key, Object oldValue,Object newValue) {
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/DefaultMapChangedListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java Wed Aug 6 06:25:27 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Used to pass information around
+ *
+ */
+public class ElectionMessage implements Externalizable{
+ static enum MessageType{ELECTION,ANSWER,COORDINATOR};
+ private Member member;
+ private MessageType type;
+
+ /**
+ * @return the member
+ */
+ public Member getMember() {
+ return this.member;
+ }
+
+ /**
+ * @param member the member to set
+ */
+ public void setMember(Member member) {
+ this.member = member;
+ }
+
+ /**
+ * @return the type
+ */
+ public MessageType getType() {
+ return this.type;
+ }
+
+ /**
+ * @param type the type to set
+ */
+ public void setType(MessageType type) {
+ this.type = type;
+ }
+
+ /**
+ * @return true if election message
+ */
+ public boolean isElection() {
+ return this.type != null && this.type.equals(MessageType.ELECTION);
+ }
+
+ /**
+ * @return true if answer message
+ */
+ public boolean isAnswer() {
+ return this.type != null && this.type.equals(MessageType.ANSWER);
+ }
+
+ /**
+ * @return true if coordinator message
+ */
+ public boolean isCoordinator() {
+ return this.type != null && this.type.equals(MessageType.COORDINATOR);
+ }
+
+
+ public ElectionMessage copy() {
+ ElectionMessage result = new ElectionMessage();
+ result.member=this.member;
+ result.type=this.type;
+ return result;
+ }
+
+
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ this.member=(Member) in.readObject();
+ this.type=(MessageType) in.readObject();
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(this.member);
+ out.writeObject(this.type);
+ }
+
+ public String toString() {
+ return "ElectionMessage: "+ this.member + "{"+this.type+ "}";
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/ElectionMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryKey.java Wed Aug 6 06:25:27 2008
@@ -30,6 +30,7 @@
private K key;
private boolean share;
private boolean removeOnExit;
+ private long expiration;
/**
* Default constructor - for serialization
@@ -88,6 +89,38 @@
public void setRemoveOnExit(boolean removeOnExit) {
this.removeOnExit = removeOnExit;
}
+
+ /**
+ * @return the expiration
+ */
+ public long getExpiration() {
+ return expiration;
+ }
+
+ /**
+ * @param expiration the expiration to set
+ */
+ public void setExpiration(long expiration) {
+ this.expiration = expiration;
+ }
+
+ void setTimeToLive(long ttl) {
+ if (ttl > 0 ) {
+ this.expiration=ttl+System.currentTimeMillis();
+ }else {
+ this.expiration =0l;
+ }
+ }
+
+ boolean isExpired() {
+ return isExpired(System.currentTimeMillis());
+ }
+
+ boolean isExpired(long currentTime) {
+ return this.expiration > 0 && this.expiration < currentTime;
+ }
+
+
public boolean equals(Object obj) {
boolean result = false;
@@ -103,6 +136,7 @@
out.writeObject(this.key);
out.writeBoolean(isShare());
out.writeBoolean(isRemoveOnExit());
+ out.writeLong(getExpiration());
}
public void readExternal(ObjectInput in) throws IOException,
@@ -111,5 +145,10 @@
this.key = (K) in.readObject();
this.share = in.readBoolean();
this.removeOnExit=in.readBoolean();
+ this.expiration=in.readLong();
+ }
+
+ public String toString() {
+ return "key:"+this.key;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryMessage.java Wed Aug 6 06:25:27 2008
@@ -26,19 +26,21 @@
*
*/
public class EntryMessage implements Externalizable{
- static enum MessageType{INSERT,DELETE};
+ static enum MessageType{INSERT,DELETE,SYNC};
private EntryKey key;
private Object value;
private MessageType type;
+ private boolean mapUpdate;
+ private boolean expired;
/**
* @return the owner
*/
public EntryKey getKey() {
- return key;
+ return this.key;
}
/**
- * @param owner the owner to set
+ * @param key
*/
public void setKey(EntryKey key) {
this.key = key;
@@ -47,7 +49,7 @@
* @return the value
*/
public Object getValue() {
- return value;
+ return this.value;
}
/**
* @param value the value to set
@@ -60,7 +62,7 @@
* @return the type
*/
public MessageType getType() {
- return type;
+ return this.type;
}
/**
* @param type the type to set
@@ -69,18 +71,81 @@
this.type = type;
}
+ /**
+ * @return the mapUpdate
+ */
+ public boolean isMapUpdate() {
+ return this.mapUpdate;
+ }
+ /**
+ * @param mapUpdate the mapUpdate to set
+ */
+ public void setMapUpdate(boolean mapUpdate) {
+ this.mapUpdate = mapUpdate;
+ }
+
+ /**
+ * @return the expired
+ */
+ public boolean isExpired() {
+ return expired;
+ }
+ /**
+ * @param expired the expired to set
+ */
+ public void setExpired(boolean expired) {
+ this.expired = expired;
+ }
+
+ /**
+ * @return if insert message
+ */
+ public boolean isInsert() {
+ return this.type != null && this.type.equals(MessageType.INSERT);
+ }
+
+ /**
+ * @return true if delete message
+ */
+ public boolean isDelete() {
+ return this.type != null && this.type.equals(MessageType.DELETE);
+ }
+
+ public boolean isSync() {
+ return this.type != null && this.type.equals(MessageType.SYNC);
+ }
+
+ public EntryMessage copy() {
+ EntryMessage result = new EntryMessage();
+ result.key=this.key;
+ result.value=this.value;
+ result.type=this.type;
+ result.mapUpdate=this.mapUpdate;
+ result.expired=this.expired;
+ return result;
+ }
+
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
this.key=(EntryKey) in.readObject();
this.value=in.readObject();
- this.type=(MessageType) in.readObject();
+ this.type=(MessageType) in.readObject();
+ this.mapUpdate=in.readBoolean();
+ this.expired=in.readBoolean();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(this.key);
out.writeObject(this.value);
out.writeObject(this.type);
+ out.writeBoolean(this.mapUpdate);
+ out.writeBoolean(this.expired);
+ }
+
+ public String toString() {
+ return "EntryMessage: "+this.type + "[" + this.key + "," + this.value +
+ "]{update=" + this.mapUpdate + "}";
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/EntryValue.java Wed Aug 6 06:25:27 2008
@@ -45,6 +45,14 @@
return this.value;
}
+ /**
+ * set the value
+ * @param value
+ */
+ public void setValue(V value) {
+ this.value=value;
+ }
+
public int hashCode() {
return this.value != null ? this.value.hashCode() : super.hashCode();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMap.java Wed Aug 6 06:25:27 2008
@@ -27,6 +27,9 @@
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -46,40 +49,60 @@
import org.apache.activemq.advisory.ConsumerListener;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.LRUSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-
/**
* <P>
- * A <CODE>GroupMap</CODE> is used to shared state amongst a distributed
- * group. You can restrict ownership of objects inserted into the map,
- * by allowing only the map that inserted the objects to update or remove them
+ * A <CODE>GroupMap</CODE> is a Map implementation that is used to shared state
+ * amongst a distributed group of other <CODE>GroupMap</CODE> instances.
+ * Membership of a group is handled automatically using discovery.
+ * <P>
+ * The underlying transport is JMS and there are some optimizations that occur
+ * for membership if used with ActiveMQ - but <CODE>GroupMap</CODE> can be used
+ * with any JMS implementation.
+ *
+ * <P>
+ * Updates to the group shared map are controlled by a coordinator. The
+ * coordinator is elected by the member with the lowest lexicographical id -
+ * based on the bully algorithm [Silberschatz et al. 1993]
+ * <P>
+ * The {@link #selectCordinator(Collection<Member> members)} method may be
+ * overridden to implement a custom mechanism for choosing how the coordinator
+ * is elected for the map.
* <P>
- * Updates to the group shared map are controlled by a co-ordinator.
- * The co-ordinator is chosen by the member with the lowest lexicographical id .
- * <P>The {@link #selectCordinator(Collection<Member> members)} method may be overridden to
- * implement a custom mechanism for choosing the co-ordinator
- * are added to the map.
+ * New <CODE>GroupMap</CODE> instances have their state updated by the coordinator,
+ * and coordinator failure is handled automatically within the group.
* <P>
+ * All updates are totally ordered through the coordinator, whilst read operations
+ * happen locally.
+ * <P>
+ * A <CODE>GroupMap</CODE>supports the concept of owner only updates(write locks),
+ * shared updates, entry expiration times and removal on owner exit -
+ * all of which are optional.
+ *
+ * <P>
+ *
* @param <K> the key type
* @param <V> the value type
- *
+ *
*/
-public class GroupMap<K, V> implements Map<K, V>,Service{
+public class GroupMap<K, V> implements Map<K, V>, Service {
+ /**
+ * default interval within which to detect a member failure
+ */
+ public static final long DEFAULT_HEART_BEAT_INTERVAL = 2000;
+ private static final long EXPIRATION_SWEEP_INTERVAL = 1000;
private static final Log LOG = LogFactory.getLog(GroupMap.class);
- private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()+".";
- private static final int HEART_BEAT_INTERVAL = 15000;
+ private static final String STATE_TOPIC_PREFIX = GroupMap.class.getName()
+ + ".";
private final Object mapMutex = new Object();
- private Map<EntryKey<K>,EntryValue<V>> localMap;
- private Map<String,Member> members = new ConcurrentHashMap<String,Member>();
- private Map<String,MapRequest> requests = new HashMap<String,MapRequest>();
- private List <MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
- private List <MapChangedListener> mapChangedListeners = new CopyOnWriteArrayList<MapChangedListener>();
- private LRUSet<Message>mapUpdateReplies = new LRUSet<Message>();
- private Member local;
+ private Map<EntryKey<K>, EntryValue<V>> localMap;
+ private Map<String, Member> members = new ConcurrentHashMap<String, Member>();
+ private Map<String, MapRequest> requests = new HashMap<String, MapRequest>();
+ private List<MemberChangedListener> membershipListeners = new CopyOnWriteArrayList<MemberChangedListener>();
+ private List<MapChangedListener> mapChangedListeners = new CopyOnWriteArrayList<MapChangedListener>();
+ Member local;
private Member coordinator;
private String groupName;
private boolean sharedWrites;
@@ -93,93 +116,122 @@
private AtomicBoolean started = new AtomicBoolean();
private SchedulerTimerTask heartBeatTask;
private SchedulerTimerTask checkMembershipTask;
- private Timer heartBeatTimer;
- private int heartBeatInterval = HEART_BEAT_INTERVAL;
- private IdGenerator requestGenerator = new IdGenerator();
+ private SchedulerTimerTask expirationTask;
+ private Timer timer;
+ private long heartBeatInterval = DEFAULT_HEART_BEAT_INTERVAL;
+ private IdGenerator idGenerator = new IdGenerator();
private boolean removeOwnedObjectsOnExit;
-
+ private int timeToLive;
+ private int minimumGroupSize = 1;
+ private final AtomicBoolean electionFinished = new AtomicBoolean(true);
+ private ExecutorService executor;
+ private final Object memberMutex = new Object();
+
/**
* @param connection
* @param name
*/
- public GroupMap(Connection connection,String name) {
- this(connection,"default",name);
+ public GroupMap(Connection connection, String name) {
+ this(connection, "default", name);
}
-
+
/**
* @param connection
* @param groupName
* @param name
*/
- public GroupMap(Connection connection,String groupName,String name) {
+ public GroupMap(Connection connection, String groupName, String name) {
this.connection = connection;
this.local = new Member(name);
- this.coordinator=this.local;
- this.groupName=groupName;
+ this.coordinator = this.local;
+ this.groupName = groupName;
}
-
+
/**
- * Set the local map implementation to be used
- * By default its a HashMap - but you could use a Cache for example
+ * Set the local map implementation to be used By default its a HashMap -
+ * but you could use a Cache for example
+ *
* @param map
*/
public void setLocalMap(Map map) {
- synchronized(this.mapMutex) {
- this.localMap=map;
+ synchronized (this.mapMutex) {
+ this.localMap = map;
}
}
-
+
/**
* Start membership to the group
- * @throws Exception
+ *
+ * @throws Exception
*
*/
public void start() throws Exception {
- if(this.started.compareAndSet(false, true)) {
- synchronized(this.mapMutex) {
- if (this.localMap==null) {
- this.localMap= new HashMap<EntryKey<K>, EntryValue<V>>();
+ if (this.started.compareAndSet(false, true)) {
+ synchronized (this.mapMutex) {
+ if (this.localMap == null) {
+ this.localMap = new HashMap<EntryKey<K>, EntryValue<V>>();
}
}
this.connection.start();
- this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ this.session = this.connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
this.producer = this.session.createProducer(null);
this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
this.inboxTopic = this.session.createTemporaryTopic();
- String topicName = STATE_TOPIC_PREFIX+this.groupName;
+ String topicName = STATE_TOPIC_PREFIX + this.groupName;
this.topic = this.session.createTopic(topicName);
- this.heartBeatTopic = this.session.createTopic(topicName+".heartbeat");
- MessageConsumer privateInbox = this.session.createConsumer(this.inboxTopic);
- privateInbox.setMessageListener(new MessageListener(){
+ this.heartBeatTopic = this.session.createTopic(topicName
+ + ".heartbeat");
+ MessageConsumer privateInbox = this.session
+ .createConsumer(this.inboxTopic);
+ privateInbox.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
- handleResponses(message);
+ processMessage(message);
}
});
- ActiveMQMessageConsumer mapChangeConsumer = (ActiveMQMessageConsumer) this.session.createConsumer(this.topic);
- mapChangeConsumer.setMessageListener(new MessageListener(){
+ MessageConsumer mapChangeConsumer = this.session
+ .createConsumer(this.topic);
+ mapChangeConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
- handleMapUpdates(message);
+ processMessage(message);
}
});
-
- MessageConsumer heartBeatConsumer = this.session.createConsumer(this.heartBeatTopic);
+ MessageConsumer heartBeatConsumer = this.session
+ .createConsumer(this.heartBeatTopic);
heartBeatConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
handleHeartbeats(message);
- }
+ }
});
-
- this.consumerEvents = new ConsumerEventSource(this.connection, this.topic);
+ this.consumerEvents = new ConsumerEventSource(this.connection,
+ this.topic);
this.consumerEvents.setConsumerListener(new ConsumerListener() {
public void onConsumerEvent(ConsumerEvent event) {
handleConsumerEvents(event);
- }
+ }
});
this.consumerEvents.start();
- this.local.setId(mapChangeConsumer.getConsumerId().toString());
+ String memberId = null;
+ if (mapChangeConsumer instanceof ActiveMQMessageConsumer) {
+ memberId = ((ActiveMQMessageConsumer) mapChangeConsumer)
+ .getConsumerId().toString();
+ } else {
+ memberId = this.idGenerator.generateId();
+ }
+ this.local.setId(memberId);
this.local.setInBoxDestination(this.inboxTopic);
+ this.executor = Executors
+ .newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "Election{"
+ + GroupMap.this.local + "}");
+ thread.setDaemon(true);
+ thread.setPriority(Thread.NORM_PRIORITY);
+ return thread;
+ }
+ });
sendHeartBeat();
- this.heartBeatTask = new SchedulerTimerTask (new Runnable() {
+ this.heartBeatTask = new SchedulerTimerTask(new Runnable() {
public void run() {
sendHeartBeat();
}
@@ -189,33 +241,76 @@
checkMembership();
}
});
- this.heartBeatTimer = new Timer("Distributed heart beat",true);
- this.heartBeatTimer.scheduleAtFixedRate(this.heartBeatTask, getHeartBeatInterval()/3, getHeartBeatInterval()/2);
+ this.expirationTask = new SchedulerTimerTask(new Runnable() {
+ public void run() {
+ expirationSweep();
+ }
+ });
+ this.timer = new Timer("Distributed heart beat", true);
+ this.timer.scheduleAtFixedRate(this.heartBeatTask,
+ getHeartBeatInterval() / 3, getHeartBeatInterval() / 2);
+ this.timer.scheduleAtFixedRate(this.checkMembershipTask,
+ getHeartBeatInterval(), getHeartBeatInterval());
+ this.timer.scheduleAtFixedRate(this.expirationTask,
+ EXPIRATION_SWEEP_INTERVAL, EXPIRATION_SWEEP_INTERVAL);
+ // await for members to join
+ long timeout = this.heartBeatInterval * this.minimumGroupSize;
+ long deadline = System.currentTimeMillis() + timeout;
+ while (this.members.size() < this.minimumGroupSize && timeout > 0) {
+ synchronized (this.memberMutex) {
+ this.memberMutex.wait(timeout);
+ }
+ timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+ }
}
}
/**
* stop membership to the group
- * @throws Exception
+ *
+ * @throws Exception
*/
- public void stop() throws Exception {
+ public void stop() {
if (this.started.compareAndSet(true, false)) {
+ this.expirationTask.cancel();
this.checkMembershipTask.cancel();
this.heartBeatTask.cancel();
- this.heartBeatTimer.purge();
- this.consumerEvents.stop();
- this.session.close();
+ this.expirationTask.cancel();
+ this.timer.purge();
+ if (this.executor != null) {
+ this.executor.shutdownNow();
+ }
+ try {
+ this.consumerEvents.stop();
+ this.session.close();
+ } catch (Exception e) {
+ LOG.debug("Caught exception stopping", e);
+ }
}
}
-
+
+ /**
+ * @return true if there is elections have finished
+ */
+ public boolean isElectionFinished() {
+ return this.electionFinished.get();
+ }
+
/**
* @return the partitionName
*/
public String getGroupName() {
return this.groupName;
}
-
+
+ /**
+ * @return the name ofthis map
+ */
+ public String getName() {
+ return this.local.getName();
+ }
+
/**
* @return the sharedWrites
*/
@@ -224,112 +319,146 @@
}
/**
- * @param sharedWrites the sharedWrites to set
+ * @param sharedWrites
+ * the sharedWrites to set
*/
public void setSharedWrites(boolean sharedWrites) {
this.sharedWrites = sharedWrites;
}
-
+
/**
* @return the heartBeatInterval
*/
- public int getHeartBeatInterval() {
+ public long getHeartBeatInterval() {
return this.heartBeatInterval;
}
/**
- * @param heartBeatInterval the heartBeatInterval to set
+ * @param heartBeatInterval
+ * the heartBeatInterval to set
*/
- public void setHeartBeatInterval(int heartBeatInterval) {
+ public void setHeartBeatInterval(long heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
}
-
-
+
/**
* @param l
*/
public void addMemberChangedListener(MemberChangedListener l) {
this.membershipListeners.add(l);
}
-
+
/**
* @param l
*/
public void removeMemberChangedListener(MemberChangedListener l) {
this.membershipListeners.remove(l);
}
-
+
/**
* @param l
*/
public void addMapChangedListener(MapChangedListener l) {
this.mapChangedListeners.add(l);
}
-
+
/**
* @param l
*/
public void removeMapChangedListener(MapChangedListener l) {
this.mapChangedListeners.remove(l);
}
-
+
+ /**
+ * @return the timeToLive
+ */
+ public int getTimeToLive() {
+ return this.timeToLive;
+ }
+
+ /**
+ * @param timeToLive
+ * the timeToLive to set
+ */
+ public void setTimeToLive(int timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
/**
* @return the removeOwnedObjectsOnExit
*/
public boolean isRemoveOwnedObjectsOnExit() {
- return removeOwnedObjectsOnExit;
+ return this.removeOwnedObjectsOnExit;
}
/**
- * Sets the policy for owned objects in the group
- * If set to true, when this <code>GroupMap<code> stops,
+ * Sets the policy for owned objects in the group If set to true, when this
+ * <code>GroupMap<code> stops,
* any objects it owns will be removed from the group map
* @param removeOwnedObjectsOnExit the removeOwnedObjectsOnExit to set
*/
public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
}
-
+
+ /**
+ * @return the minimumGroupSize
+ */
+ public int getMinimumGroupSize() {
+ return this.minimumGroupSize;
+ }
+
+ /**
+ * @param minimumGroupSize
+ * the minimumGroupSize to set
+ */
+ public void setMinimumGroupSize(int minimumGroupSize) {
+ this.minimumGroupSize = minimumGroupSize;
+ }
+
/**
* clear entries from the Map
- * @throws IllegalStateException
+ *
+ * @throws IllegalStateException
*/
public void clear() throws IllegalStateException {
- checkStarted();
- if(this.localMap != null && !this.localMap.isEmpty()) {
+ checkStatus();
+ if (this.localMap != null && !this.localMap.isEmpty()) {
Set<EntryKey<K>> keys = null;
- synchronized(this.mapMutex) {
+ synchronized (this.mapMutex) {
keys = new HashSet<EntryKey<K>>(this.localMap.keySet());
- this.localMap.clear();
}
-
- for(EntryKey<K>key:keys) {
+ for (EntryKey<K> key : keys) {
remove(key);
}
- }
+ }
+ this.localMap.clear();
}
-
-
+
public boolean containsKey(Object key) {
- EntryKey stateKey = new EntryKey(this.local,key);
- synchronized(this.mapMutex) {
- return this.localMap != null ? this.localMap.containsKey(stateKey):false;
+ EntryKey stateKey = new EntryKey(this.local, key);
+ synchronized (this.mapMutex) {
+ return this.localMap != null ? this.localMap.containsKey(stateKey)
+ : false;
}
}
public boolean containsValue(Object value) {
- EntryValue entryValue = new EntryValue(this.local,value);
- synchronized(this.mapMutex) {
- return this.localMap != null ? this.localMap.containsValue(entryValue):false;
+ EntryValue entryValue = new EntryValue(this.local, value);
+ synchronized (this.mapMutex) {
+ return this.localMap != null ? this.localMap
+ .containsValue(entryValue) : false;
}
}
public Set<java.util.Map.Entry<K, V>> entrySet() {
- Map<K,V>result = new HashMap<K,V>();
- synchronized(this.mapMutex) {
- if(this.localMap!=null) {
- for(java.util.Map.Entry<EntryKey<K>,EntryValue<V>>entry:this.localMap.entrySet()) {
- result.put(entry.getKey().getKey(),entry.getValue().getValue());
+ Map<K, V> result = new HashMap<K, V>();
+ synchronized (this.mapMutex) {
+ if (this.localMap != null) {
+ for (java.util.Map.Entry<EntryKey<K>, EntryValue<V>> entry : this.localMap
+ .entrySet()) {
+ result.put(entry.getKey().getKey(), entry.getValue()
+ .getValue());
}
}
}
@@ -337,150 +466,170 @@
}
public V get(Object key) {
- EntryKey<K> stateKey = new EntryKey<K>(this.local,(K) key);
- EntryValue<V> value = null;
- synchronized(this.mapMutex) {
- value = this.localMap != null ?this.localMap.get(stateKey):null;
- }
- return value != null ? value.getValue() : null;
+ EntryKey<K> stateKey = new EntryKey<K>(this.local, (K) key);
+ EntryValue<V> value = null;
+ synchronized (this.mapMutex) {
+ value = this.localMap != null ? this.localMap.get(stateKey) : null;
+ }
+ return value != null ? value.getValue() : null;
}
public boolean isEmpty() {
- synchronized(this.mapMutex) {
- return this.localMap != null ? this.localMap.isEmpty():true;
+ synchronized (this.mapMutex) {
+ return this.localMap != null ? this.localMap.isEmpty() : true;
}
}
public Set<K> keySet() {
- Set <K>result = new HashSet<K>();
- synchronized(this.mapMutex) {
- if(this.localMap!=null) {
- for (EntryKey<K> key:this.localMap.keySet()) {
+ Set<K> result = new HashSet<K>();
+ synchronized (this.mapMutex) {
+ if (this.localMap != null) {
+ for (EntryKey<K> key : this.localMap.keySet()) {
result.add(key.getKey());
}
}
}
return result;
}
+
/**
* Puts an value into the map associated with the key
- * @param key
- * @param value
+ *
+ * @param key
+ * @param value
* @return the old value or null
- * @throws IllegalAccessException
- * @throws IllegalStateException
+ * @throws GroupMapUpdateException
+ * @throws IllegalStateException
*
*/
- public V put(K key, V value) throws IllegalAccessException,IllegalStateException{
- return put(key,value,isSharedWrites(),isRemoveOwnedObjectsOnExit());
+ public V put(K key, V value) throws GroupMapUpdateException,
+ IllegalStateException {
+ return put(key, value, isSharedWrites(), isRemoveOwnedObjectsOnExit(),
+ getTimeToLive());
}
-
+
/**
* Puts an value into the map associated with the key
- * @param key
- * @param value
- * @param sharedWrites
- * @param removeOnExit
+ *
+ * @param key
+ * @param value
+ * @param sharedWrites
+ * @param removeOnExit
+ * @param timeToLive
* @return the old value or null
- * @throws IllegalAccessException
- * @throws IllegalStateException
+ * @throws GroupMapUpdateException
+ * @throws IllegalStateException
*
*/
- public V put(K key, V value,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException, IllegalStateException{
- checkStarted();
- EntryKey<K>entryKey = new EntryKey<K>(this.local,key);
- EntryValue<V>stateValue = new EntryValue<V>(this.local,value);
+ public V put(K key, V value, boolean sharedWrites, boolean removeOnExit,
+ long timeToLive) throws GroupMapUpdateException,
+ IllegalStateException {
+ checkStatus();
+ EntryKey<K> entryKey = new EntryKey<K>(this.local, key);
+ EntryValue<V> stateValue = new EntryValue<V>(this.local, value);
entryKey.setShare(sharedWrites);
entryKey.setRemoveOnExit(removeOnExit);
+ entryKey.setTimeToLive(timeToLive);
EntryMessage entryMsg = new EntryMessage();
entryMsg.setKey(entryKey);
entryMsg.setValue(value);
entryMsg.setType(EntryMessage.MessageType.INSERT);
- return sendEntryMessage(entryMsg);
+ return (V) sendRequest(getCoordinator(), entryMsg);
}
-
+
/**
* Add the Map to the distribution
+ *
* @param t
- * @throws IllegalAccessException
+ * @throws GroupMapUpdateException
* @throws IllegalStateException
*/
- public void putAll(Map<? extends K, ? extends V> t) throws IllegalAccessException,IllegalStateException {
- putAll(t,isSharedWrites(),isRemoveOwnedObjectsOnExit());
+ public void putAll(Map<? extends K, ? extends V> t)
+ throws GroupMapUpdateException, IllegalStateException {
+ putAll(t, isSharedWrites(), isRemoveOwnedObjectsOnExit(),
+ getTimeToLive());
}
/**
* Add the Map to the distribution
+ *
* @param t
* @param sharedWrites
* @param removeOnExit
- * @throws IllegalAccessException
+ * @param timeToLive
+ * @throws GroupMapUpdateException
* @throws IllegalStateException
*/
- public void putAll(Map<? extends K, ? extends V> t,boolean sharedWrites,boolean removeOnExit) throws IllegalAccessException,IllegalStateException {
- for(java.util.Map.Entry<? extends K, ? extends V>entry:t.entrySet()) {
- put(entry.getKey(),entry.getValue(),sharedWrites,removeOnExit);
- }
+ public void putAll(Map<? extends K, ? extends V> t, boolean sharedWrites,
+ boolean removeOnExit, long timeToLive)
+ throws GroupMapUpdateException, IllegalStateException {
+ for (java.util.Map.Entry<? extends K, ? extends V> entry : t.entrySet()) {
+ put(entry.getKey(), entry.getValue(), sharedWrites, removeOnExit,
+ timeToLive);
+ }
}
/**
* remove a value from the map associated with the key
- * @param key
+ *
+ * @param key
* @return the Value or null
- * @throws IllegalAccessException
- * @throws IllegalStateException
+ * @throws GroupMapUpdateException
+ * @throws IllegalStateException
*
*/
- public V remove(Object key) throws IllegalAccessException,IllegalStateException{
- EntryKey<K> entryKey = new EntryKey<K>(this.local,(K) key);
+ public V remove(Object key) throws GroupMapUpdateException,
+ IllegalStateException {
+ EntryKey<K> entryKey = new EntryKey<K>(this.local, (K) key);
return remove(entryKey);
}
-
- V remove(EntryKey<K> key) throws IllegalAccessException,IllegalStateException{
- checkStarted();
+
+ V remove(EntryKey<K> key) throws GroupMapUpdateException,
+ IllegalStateException {
+ checkStatus();
EntryMessage entryMsg = new EntryMessage();
entryMsg.setKey(key);
entryMsg.setType(EntryMessage.MessageType.DELETE);
- return sendEntryMessage(entryMsg);
+ return (V) sendRequest(getCoordinator(), entryMsg);
}
-
-
+
public int size() {
- synchronized(this.mapMutex) {
- return this.localMap != null ? this.localMap.size():0;
+ synchronized (this.mapMutex) {
+ return this.localMap != null ? this.localMap.size() : 0;
}
}
public Collection<V> values() {
List<V> result = new ArrayList<V>();
- synchronized(this.mapMutex) {
- if(this.localMap!=null) {
- for (EntryValue<V> value:this.localMap.values()) {
+ synchronized (this.mapMutex) {
+ if (this.localMap != null) {
+ for (EntryValue<V> value : this.localMap.values()) {
result.add(value.getValue());
}
}
}
return result;
}
-
+
/**
- * @return a set of the members
+ * @return a set of the members
*/
- public Set<Member> members(){
- Set<Member>result = new HashSet<Member>();
+ public Set<Member> members() {
+ Set<Member> result = new HashSet<Member>();
result.addAll(this.members.values());
return result;
}
-
+
/**
* @param key
* @return true if this is the owner of the key
*/
public boolean isOwner(K key) {
- EntryKey<K> stateKey = new EntryKey<K>(this.local,key);
+ EntryKey<K> stateKey = new EntryKey<K>(this.local, key);
EntryValue<V> entryValue = null;
- synchronized(this.mapMutex) {
- entryValue = this.localMap != null ? this.localMap.get(stateKey):null;
+ synchronized (this.mapMutex) {
+ entryValue = this.localMap != null ? this.localMap.get(stateKey)
+ : null;
}
boolean result = false;
if (entryValue != null) {
@@ -488,102 +637,152 @@
}
return result;
}
-
+
/**
* Get the owner of a key
+ *
* @param key
* @return the owner - or null if the key doesn't exist
*/
public Member getOwner(K key) {
- EntryKey<K> stateKey = new EntryKey<K>(this.local,key);
+ EntryKey<K> stateKey = new EntryKey<K>(this.local, key);
EntryValue<V> entryValue = null;
- synchronized(this.mapMutex) {
- entryValue = this.localMap != null ? this.localMap.get(stateKey):null;
+ synchronized (this.mapMutex) {
+ entryValue = this.localMap != null ? this.localMap.get(stateKey)
+ : null;
}
- return entryValue != null ? entryValue.getOwner():null;
+ return entryValue != null ? entryValue.getOwner() : null;
}
-
+
/**
* @return true if the coordinator for the map
*/
public boolean isCoordinator() {
return this.local.equals(this.coordinator);
}
-
+
+ /**
+ * @return the coordinator
+ */
+ public Member getCoordinator() {
+ return this.coordinator;
+ }
+
/**
- * Select a coordinator - by default, its the member with
- * the lowest lexicographical id
+ * Select a coordinator - by default, its the member with the lowest
+ * lexicographical id
+ *
* @param members
* @return
*/
- protected Member selectCordinator(Collection<Member>members) {
+ protected Member selectCordinator(Collection<Member> members) {
Member result = this.local;
- for (Member member:members) {
+ for (Member member : members) {
if (result.getId().compareTo(member.getId()) < 0) {
result = member;
}
}
- return result;
+ return result;
}
-
- V sendEntryMessage(EntryMessage entry) {
+
+ Object sendRequest(Member member, Serializable payload) {
Object result = null;
MapRequest request = new MapRequest();
- String id = this.requestGenerator.generateId();
- synchronized(this.requests) {
- this.requests.put(id,request);
+ String id = this.idGenerator.generateId();
+ synchronized (this.requests) {
+ this.requests.put(id, request);
}
try {
- ObjectMessage objMsg = this.session.createObjectMessage(entry);
+ ObjectMessage objMsg = this.session.createObjectMessage(payload);
objMsg.setJMSReplyTo(this.inboxTopic);
objMsg.setJMSCorrelationID(id);
- this.producer.send(this.topic, objMsg);
- result = request.get(getHeartBeatInterval()*2);
- }catch(JMSException e) {
- if(this.started.get()) {
- LOG.error("Failed to send EntryMessage " + entry,e);
+ this.producer.send(member.getInBoxDestination(), objMsg);
+ result = request.get(getHeartBeatInterval() * 200000);
+ } catch (JMSException e) {
+ if (this.started.get()) {
+ LOG.error("Failed to send request " + payload, e);
}
}
- if (result instanceof IllegalAccessException) {
- throw (IllegalAccessException)result;
+ if (result instanceof GroupMapUpdateException) {
+ throw (GroupMapUpdateException) result;
}
- return (V) result;
+ return result;
}
-
- void handleResponses(Message message) {
+
+ void sendAsyncRequest(AsyncMapRequest asyncRequest, Member member,
+ Serializable payload) {
+ MapRequest request = new MapRequest();
+ String id = this.idGenerator.generateId();
+ asyncRequest.add(id, request);
+ synchronized (this.requests) {
+ this.requests.put(id, request);
+ }
+ try {
+ ObjectMessage objMsg = this.session.createObjectMessage(payload);
+ objMsg.setJMSReplyTo(this.inboxTopic);
+ objMsg.setJMSCorrelationID(id);
+ this.producer.send(member.getInBoxDestination(), objMsg);
+ } catch (JMSException e) {
+ if (this.started.get()) {
+ LOG.error("Failed to send async request " + payload, e);
+ }
+ }
+ }
+
+ void sendReply(Object reply, Destination replyTo, String id) {
+ try {
+ ObjectMessage replyMsg = this.session
+ .createObjectMessage((Serializable) reply);
+ replyMsg.setJMSCorrelationID(id);
+ this.producer.send(replyTo, replyMsg);
+ } catch (JMSException e) {
+ LOG.error("Couldn't send reply from co-ordinator", e);
+ }
+ }
+
+ void broadcastMapUpdate(EntryMessage entry, String correlationId) {
+ try {
+ EntryMessage copy = entry.copy();
+ copy.setMapUpdate(true);
+ ObjectMessage objMsg = this.session.createObjectMessage(copy);
+ objMsg.setJMSCorrelationID(correlationId);
+ this.producer.send(this.topic, objMsg);
+ } catch (JMSException e) {
+ if (this.started.get()) {
+ LOG.error("Failed to send EntryMessage " + entry, e);
+ }
+ }
+ }
+
+ void processMessage(Message message) {
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) message;
try {
+ String id = objMsg.getJMSCorrelationID();
+ Destination replyTo = objMsg.getJMSReplyTo();
Object payload = objMsg.getObject();
if (payload instanceof Member) {
- handleHeartbeats((Member)payload);
- } else if(payload instanceof EntryMessage) {
+ handleHeartbeats((Member) payload);
+ } else if (payload instanceof EntryMessage) {
EntryMessage entryMsg = (EntryMessage) payload;
- EntryKey<K>key=entryMsg.getKey();
- EntryValue<V> value = new EntryValue<V>(key.getOwner(),(V) entryMsg.getValue());
-
- if(this.localMap !=null) {
- boolean fireUpdate = false;
- synchronized(this.mapMutex) {
- if(!this.localMap.containsKey(key)) {
- this.localMap.put(key,value);
- fireUpdate=true;
- }
- }
- if(fireUpdate) {
- fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue());
- }
+ entryMsg = entryMsg.copy();
+ if (entryMsg.isMapUpdate()) {
+ processMapUpdate(entryMsg);
+ } else {
+ processEntryMessage(entryMsg, replyTo, id);
}
-
-
- }else {
- String id = objMsg.getJMSCorrelationID();
+ } else if (payload instanceof ElectionMessage) {
+ ElectionMessage electionMsg = (ElectionMessage) payload;
+ electionMsg = electionMsg.copy();
+ processElectionMessage(electionMsg, replyTo, id);
+ }
+ if (id != null) {
MapRequest result = null;
synchronized (this.requests) {
result = this.requests.remove(id);
}
if (result != null) {
- result.put(objMsg.getObject());
+ result.put(id, objMsg.getObject());
}
}
} catch (JMSException e) {
@@ -591,83 +790,92 @@
}
}
}
-
- void handleMapUpdates(Message message) {
- Object reply = null;
- if (message instanceof ObjectMessage) {
- try {
- ObjectMessage objMsg = (ObjectMessage) message;
- EntryMessage entryMsg = (EntryMessage) objMsg.getObject();
- EntryKey<K> key = entryMsg.getKey();
- EntryValue<V> value = new EntryValue<V>(key.getOwner(),(V) entryMsg.getValue());
- boolean containsKey=false;
- boolean mapExists = false;
- synchronized(this.mapMutex) {
- mapExists = this.localMap!=null;
- if(mapExists) {
- containsKey=this.localMap.containsKey(key);
- }
- }
- if(mapExists) {
- if (containsKey) {
- Member owner = getOwner((K) key.getKey());
- if (owner.equals(key.getOwner()) && !key.isShare()) {
- EntryValue<V> old = null;
- if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) {
- synchronized(this.mapMutex) {
- old = this.localMap.put(key, value);
- }
- }else {
- synchronized(this.mapMutex) {
- old = this.localMap.remove(key);
- }
- }
- fireMapChanged(owner, key.getKey(), old.getValue(), value.getValue());
- }else {
- reply = new IllegalAccessException("Owned by "+ owner);
+
+ void processEntryMessage(EntryMessage entryMsg, Destination replyTo,
+ String correlationId) {
+ if (isCoordinator()) {
+ EntryKey<K> key = entryMsg.getKey();
+ EntryValue<V> value = new EntryValue<V>(key.getOwner(),
+ (V) entryMsg.getValue());
+ boolean insert = entryMsg.isInsert();
+ boolean containsKey = false;
+ synchronized (this.mapMutex) {
+ containsKey = this.localMap.containsKey(key);
+ }
+ if (containsKey) {
+ Member owner = getOwner((K) key.getKey());
+ if (owner.equals(key.getOwner()) || key.isShare()) {
+ EntryValue<V> old = null;
+ if (insert) {
+ synchronized (this.mapMutex) {
+ old = this.localMap.put(key, value);
}
- }else {
- if (entryMsg.getType().equals(EntryMessage.MessageType.INSERT)) {
- synchronized(this.mapMutex) {
- this.localMap.put(key, value);
- }
- fireMapChanged(key.getOwner(), key.getKey(), null, value.getValue());
+ } else {
+ synchronized (this.mapMutex) {
+ old = this.localMap.remove(key);
}
}
+ broadcastMapUpdate(entryMsg, correlationId);
+ fireMapChanged(owner, key.getKey(), old.getValue(), value
+ .getValue(), false);
+ } else {
+ Serializable reply = new GroupMapUpdateException(
+ "Owned by " + owner);
+ sendReply(reply, replyTo, correlationId);
+ }
+ } else {
+ if (insert) {
+ synchronized (this.mapMutex) {
+ this.localMap.put(key, value);
+ }
+ broadcastMapUpdate(entryMsg, correlationId);
+ fireMapChanged(key.getOwner(), key.getKey(), null, value
+ .getValue(), false);
+ } else {
+ sendReply(null, replyTo, correlationId);
}
- } catch (JMSException e) {
- LOG.warn("Failed to process map update",e);
- reply = e;
}
-
- try {
- Destination replyTo = message.getJMSReplyTo();
- String correlationId = message.getJMSCorrelationID();
- ObjectMessage replyMsg = this.session
- .createObjectMessage((Serializable) reply);
- replyMsg.setJMSCorrelationID(correlationId);
- // reuse timestamp - this will be cleared by the producer on
- // send
- replyMsg.setJMSTimestamp(System.currentTimeMillis());
- if (isCoordinator()) {
- this.producer.send(replyTo, replyMsg);
- }else {
- synchronized(mapUpdateReplies) {
- this.mapUpdateReplies.add(replyMsg);
+ }
+ }
+
+ void processMapUpdate(EntryMessage entryMsg) {
+ boolean containsKey = false;
+ EntryKey<K> key = entryMsg.getKey();
+ EntryValue<V> value = new EntryValue<V>(key.getOwner(), (V) entryMsg
+ .getValue());
+ boolean insert = entryMsg.isInsert()||entryMsg.isSync();
+ synchronized (this.mapMutex) {
+ containsKey = this.localMap.containsKey(key);
+ }
+
+ if (!isCoordinator()||entryMsg.isSync()) {
+ if (containsKey) {
+ Member owner = getOwner((K) key.getKey());
+ EntryValue<V> old = null;
+ if (insert) {
+ synchronized (this.mapMutex) {
+ old = this.localMap.put(key, value);
+ }
+ } else {
+ synchronized (this.mapMutex) {
+ old = this.localMap.remove(key);
+ value.setValue(null);
}
}
- } catch (JMSException e) {
- if(this.started.get()) {
- LOG.error("Failed to send response to a map update ", e);
+ fireMapChanged(owner, key.getKey(), old.getValue(), value
+ .getValue(), entryMsg.isExpired());
+ } else {
+ if (insert) {
+ synchronized (this.mapMutex) {
+ this.localMap.put(key, value);
+ }
+ fireMapChanged(key.getOwner(), key.getKey(), null, value
+ .getValue(), false);
}
}
-
-
- }else {
- LOG.warn("Unexpected map update message " + message);
}
}
-
+
void handleHeartbeats(Message message) {
try {
if (message instanceof ObjectMessage) {
@@ -685,63 +893,99 @@
void handleHeartbeats(Member member) {
member.setTimeStamp(System.currentTimeMillis());
if (this.members.put(member.getId(), member) == null) {
+ election(member, true);
fireMemberStarted(member);
if (!member.equals(this.local)) {
- //send the new member our details
sendHeartBeat(member.getInBoxDestination());
- if(isCoordinator()) {
- updateNewMemberMap(member);
- }
+ }
+ synchronized (this.memberMutex) {
+ this.memberMutex.notifyAll();
}
}
}
-
+
void handleConsumerEvents(ConsumerEvent event) {
if (!event.isStarted()) {
- Member member = this.members.remove(event.getConsumerId().toString());
- if(member!=null) {
+ Member member = this.members.remove(event.getConsumerId()
+ .toString());
+ if (member != null) {
fireMemberStopped(member);
- doElection();
+ election(member, false);
}
}
}
-
+
void checkMembership() {
- if (this.started.get()) {
- long checkTime = System.currentTimeMillis()-getHeartBeatInterval();
+ if (this.started.get() && this.electionFinished.get()) {
+ long checkTime = System.currentTimeMillis()
+ - getHeartBeatInterval();
boolean doElection = false;
for (Member member : this.members.values()) {
- if (member.getTimeStamp()<checkTime) {
+ if (member.getTimeStamp() < checkTime) {
LOG.info("Member timestamp expired " + member);
this.members.remove(member.getId());
fireMemberStopped(member);
- doElection=true;
-
+ doElection = true;
}
}
if (doElection) {
- doElection();
+ election(null, false);
}
}
- //clear down cached reply messages
- long checkTime = System.currentTimeMillis()-(getHeartBeatInterval()*2);
- List<Message> tmpList = new ArrayList<Message>();
- synchronized(this.mapUpdateReplies) {
- try {
- for(Message msg:this.mapUpdateReplies) {
- if (msg.getJMSTimestamp() < checkTime) {
- tmpList.add(msg);
+ }
+
+ void expirationSweep() {
+ if (isCoordinator() && this.started.get() && this.electionFinished.get()) {
+ List<EntryKey> list = null;
+ synchronized (this.mapMutex) {
+ Map<EntryKey<K>, EntryValue<V>> map = this.localMap;
+ if (map != null) {
+ long currentTime = System.currentTimeMillis();
+ for (EntryKey k : map.keySet()) {
+ if (k.isExpired(currentTime)) {
+ if (list == null) {
+ list = new ArrayList<EntryKey>();
+ list.add(k);
+ }
+ }
}
}
- for(Message msg:tmpList) {
- this.mapUpdateReplies.remove(msg);
- }
- }catch(JMSException e) {
- LOG.warn("Failed to clear down mapUpdateReplies",e);
+ }
+ //do the actual removal of entries in a separate thread
+ if (list != null) {
+ final List<EntryKey> expire = list;
+ this.executor.execute(new Runnable() {
+ public void run() {
+ doExpiration(expire);
+ }
+ });
}
}
+
}
+ void doExpiration(List<EntryKey> list) {
+ if (this.started.get() && this.electionFinished.get()
+ && isCoordinator()) {
+ for (EntryKey k : list) {
+ EntryValue<V> old = null;
+ synchronized (this.mapMutex) {
+ old = this.localMap.remove(k);
+ }
+ if (old != null) {
+ EntryMessage entryMsg = new EntryMessage();
+ entryMsg.setType(EntryMessage.MessageType.DELETE);
+ entryMsg.setExpired(true);
+ entryMsg.setKey(k);
+ entryMsg.setValue(old.getValue());
+ broadcastMapUpdate(entryMsg, "");
+ fireMapChanged(k.getOwner(), k.getKey(), old.getValue(),
+ null, true);
+ }
+ }
+ }
+ }
+
void sendHeartBeat() {
sendHeartBeat(this.heartBeatTopic);
}
@@ -752,14 +996,16 @@
ObjectMessage msg = this.session
.createObjectMessage(this.local);
this.producer.send(destination, msg);
+ } catch (javax.jms.IllegalStateException e) {
+ // ignore - as we are probably stopping
} catch (Throwable e) {
- if(this.started.get()) {
+ if (this.started.get()) {
LOG.warn("Failed to send heart beat", e);
}
}
}
}
-
+
void updateNewMemberMap(Member member) {
List<Map.Entry<EntryKey<K>, EntryValue<V>>> list = new ArrayList<Map.Entry<EntryKey<K>, EntryValue<V>>>();
synchronized (this.mapMutex) {
@@ -775,10 +1021,11 @@
EntryMessage entryMsg = new EntryMessage();
entryMsg.setKey(entry.getKey());
entryMsg.setValue(entry.getValue().getValue());
- entryMsg.setType(EntryMessage.MessageType.INSERT);
+ entryMsg.setType(EntryMessage.MessageType.SYNC);
+ entryMsg.setMapUpdate(true);
ObjectMessage objMsg = this.session
.createObjectMessage(entryMsg);
- if(!member.equals(entry.getKey().getOwner())) {
+ if (!member.equals(entry.getKey().getOwner())) {
this.producer.send(member.getInBoxDestination(), objMsg);
}
}
@@ -788,26 +1035,26 @@
}
}
}
-
+
void fireMemberStarted(Member member) {
- LOG.info(this.local.getName()+" Member started " + member);
+ LOG.info(this.local.getName() + " Member started " + member);
for (MemberChangedListener l : this.membershipListeners) {
l.memberStarted(member);
}
}
-
+
void fireMemberStopped(Member member) {
- LOG.info(this.local.getName()+" Member stopped " + member);
+ LOG.info(this.local.getName() + " Member stopped " + member);
for (MemberChangedListener l : this.membershipListeners) {
l.memberStopped(member);
}
- //remove all entries owned by the stopped member
- List<EntryKey<K>>tmpList = new ArrayList<EntryKey<K>>();
+ // remove all entries owned by the stopped member
+ List<EntryKey<K>> tmpList = new ArrayList<EntryKey<K>>();
boolean mapExists = false;
- synchronized(this.mapMutex) {
- mapExists=this.localMap!=null;
- if(mapExists) {
- for (EntryKey<K> entryKey:this.localMap.keySet()) {
+ synchronized (this.mapMutex) {
+ mapExists = this.localMap != null;
+ if (mapExists) {
+ for (EntryKey<K> entryKey : this.localMap.keySet()) {
if (entryKey.getOwner().equals(member)) {
if (entryKey.isRemoveOnExit()) {
tmpList.add(entryKey);
@@ -816,49 +1063,154 @@
}
}
}
- if(mapExists) {
- for (EntryKey<K> entryKey:tmpList) {
+ if (mapExists) {
+ for (EntryKey<K> entryKey : tmpList) {
EntryValue<V> value = null;
- synchronized(this.mapMutex) {
+ synchronized (this.mapMutex) {
value = this.localMap.remove(entryKey);
}
- fireMapChanged(member, entryKey.getKey(), value.getValue(), null);
+ fireMapChanged(member, entryKey.getKey(), value.getValue(),
+ null,false);
}
}
}
-
- void fireMapChanged(Member owner,Object key, Object oldValue, Object newValue) {
- for (MapChangedListener l:this.mapChangedListeners) {
- l.mapChanged(owner, key, oldValue, newValue);
+
+ void fireMapChanged(final Member owner, final Object key,
+ final Object oldValue, final Object newValue, final boolean expired) {
+ if (this.started.get() && this.executor != null
+ && !this.executor.isShutdown()) {
+ this.executor.execute(new Runnable() {
+ public void run() {
+ doFireMapChanged(owner, key, oldValue, newValue, expired);
+ }
+ });
}
}
-
- void doElection() {
- this.coordinator=selectCordinator(this.members.values());
- if (isCoordinator() && this.started.get()) {
- //send any inflight requests
- List<Message>list = new ArrayList<Message>();
- synchronized(this.mapUpdateReplies) {
- list.addAll(this.mapUpdateReplies);
- this.mapUpdateReplies.clear();
+
+ void doFireMapChanged(Member owner, Object key, Object oldValue,
+ Object newValue, boolean expired) {
+ for (MapChangedListener l : this.mapChangedListeners) {
+ if (oldValue == null) {
+ l.mapInsert(owner, key, newValue);
+ } else if (newValue == null) {
+ l.mapRemove(owner, key, oldValue, expired);
+ } else {
+ l.mapUpdate(owner, key, oldValue, newValue);
}
- try {
- for(Message msg:list) {
- if (this.started.get()) {
- this.producer.send(msg.getJMSReplyTo(), msg);
+ }
+ }
+
+ void election(final Member member, final boolean memberStarted) {
+ if (this.started.get() && this.executor != null
+ && !this.executor.isShutdown()) {
+ this.executor.execute(new Runnable() {
+ public void run() {
+ doElection(member, memberStarted);
}
+ });
+ }
+ }
+
+ void doElection(Member member, boolean memberStarted) {
+ if ((member == null || !member.equals(this.local))
+ && this.electionFinished.compareAndSet(true, false)) {
+ boolean wasCoordinator = isCoordinator() && !isEmpty();
+ // call an election
+ while (!callElection())
+ ;
+ List<Member> members = new ArrayList<Member>(this.members.values());
+ this.coordinator = selectCordinator(members);
+ if (isCoordinator()) {
+ broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
}
- }catch(JMSException e) {
- if(this.started.get()) {
- LOG.error("Failed to resend replies",e);
+ if (memberStarted && member != null) {
+ if (wasCoordinator || isCoordinator() && this.started.get()) {
+ updateNewMemberMap(member);
}
}
+ if (!this.electionFinished.get()) {
+ try {
+ synchronized (this.electionFinished) {
+ this.electionFinished.wait(this.heartBeatInterval * 2);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ if (!this.electionFinished.get()) {
+ // we must be the coordinator
+ this.coordinator = this.local;
+ this.electionFinished.set(true);
+ broadcastElectionType(ElectionMessage.MessageType.COORDINATOR);
+ }
}
- }
-
- void checkStarted() throws IllegalStateException{
+ }
+
+ boolean callElection() {
+ List<Member> members = new ArrayList<Member>(this.members.values());
+ AsyncMapRequest request = new AsyncMapRequest();
+ for (Member member : members) {
+ if (this.local.getId().compareTo(member.getId()) < 0) {
+ ElectionMessage msg = new ElectionMessage();
+ msg.setMember(this.local);
+ msg.setType(ElectionMessage.MessageType.ELECTION);
+ sendAsyncRequest(request, member, msg);
+ }
+ }
+ return request.isSuccess(getHeartBeatInterval());
+ }
+
+ void processElectionMessage(ElectionMessage msg, Destination replyTo,
+ String correlationId) {
+ if (msg.isElection()) {
+ msg.setType(ElectionMessage.MessageType.ANSWER);
+ msg.setMember(this.local);
+ sendReply(msg, replyTo, correlationId);
+ } else if (msg.isCoordinator()) {
+ synchronized (this.electionFinished) {
+ this.coordinator = msg.getMember();
+ this.electionFinished.set(true);
+ this.electionFinished.notifyAll();
+ }
+ }
+ }
+
+ void broadcastElectionType(ElectionMessage.MessageType type) {
+ if (started.get()) {
+ try {
+ ElectionMessage msg = new ElectionMessage();
+ msg.setMember(this.local);
+ msg.setType(type);
+ ObjectMessage objMsg = this.session.createObjectMessage(msg);
+ this.producer.send(this.topic, objMsg);
+ } catch (javax.jms.IllegalStateException e) {
+ // ignore - we are stopping
+ } catch (JMSException e) {
+ if (this.started.get()) {
+ LOG.error("Failed to broadcast election message: " + type,
+ e);
+ }
+ }
+ }
+ }
+
+ void checkStatus() throws IllegalStateException {
if (!started.get()) {
- throw new IllegalStateException("GroupMap " + this.local.getName() + " not started");
+ throw new IllegalStateException("GroupMap " + this.local.getName()
+ + " not started");
+ }
+ waitForElection();
+ }
+
+ void waitForElection() {
+ synchronized (this.electionFinished) {
+ while (started.get() && !this.electionFinished.get()) {
+ try {
+ this.electionFinished.wait(1000);
+ } catch (InterruptedException e) {
+ stop();
+ Thread.currentThread().interrupt();
+ }
+ }
}
- }
+ }
}
Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java (from r679849, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java&r1=679849&r2=683259&rev=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/IllegalAccessException.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/GroupMapUpdateException.java Wed Aug 6 06:25:27 2008
@@ -20,13 +20,13 @@
* thrown when updating a key to map that the local client doesn't own
*
*/
-public class IllegalAccessException extends java.lang.IllegalStateException {
+public class GroupMapUpdateException extends RuntimeException {
private static final long serialVersionUID = -7584658017201604560L;
/**
* @param message
*/
- public IllegalAccessException(String message) {
+ public GroupMapUpdateException(String message) {
super(message);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapChangedListener.java Wed Aug 6 06:25:27 2008
@@ -23,11 +23,28 @@
public interface MapChangedListener {
/**
- * Called when the map has changed
+ * Called when a key/value pair is inserted into the map
* @param owner
* @param key
+ * @param value
+ */
+ void mapInsert(Member owner,Object key, Object value);
+
+ /**
+ * Called when a key value is updated in the map
+ * @param owner
+ * @param Key
* @param oldValue
* @param newValue
*/
- void mapChanged(Member owner,Object key, Object oldValue, Object newValue);
+ void mapUpdate(Member owner,Object Key,Object oldValue,Object newValue);
+
+ /**
+ * Called when a key value is removed from the map
+ * @param owner
+ * @param key
+ * @param value
+ * @param expired
+ */
+ void mapRemove(Member owner,Object key, Object value,boolean expired);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java?rev=683259&r1=683258&r2=683259&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/MapRequest.java Wed Aug 6 06:25:27 2008
@@ -25,12 +25,13 @@
public class MapRequest {
private final AtomicBoolean done = new AtomicBoolean();
private Object response;
+ private RequestCallback callback;
- Object get(int timeout) {
- synchronized (done) {
- if (done.get() == false && response == null) {
+ Object get(long timeout) {
+ synchronized (this.done) {
+ if (this.done.get() == false && this.response == null) {
try {
- done.wait(timeout);
+ this.done.wait(timeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -39,15 +40,23 @@
return this.response;
}
- void put(Object response) {
+ void put(String id,Object response) {
this.response = response;
cancel();
+ RequestCallback callback = this.callback;
+ if (callback != null) {
+ callback.finished(id);
+ }
}
void cancel() {
- done.set(true);
- synchronized (done) {
- done.notifyAll();
+ this.done.set(true);
+ synchronized (this.done) {
+ this.done.notifyAll();
}
}
+
+ void setCallback(RequestCallback callback) {
+ this.callback=callback;
+ }
}
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java Wed Aug 6 06:25:27 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+
+/**
+ * Return information about map update
+ *
+ */
+public interface RequestCallback{
+ /**
+ * Optionally called when a request is finished
+ * @param id
+ */
+ void finished(String id);
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/group/RequestCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java?rev=683259&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java Wed Aug 6 06:25:27 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+
+public class GroupMapMemberTest extends TestCase {
+ protected BrokerService broker;
+ protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+ /**
+ * Test method for
+ * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+ * @throws Exception
+ */
+ public void testGroup() throws Exception {
+
+ int number = 20;
+ List<Connection>connections = new ArrayList<Connection>();
+ List<GroupMap>groupMaps = new ArrayList<GroupMap>();
+ ConnectionFactory factory = createConnectionFactory();
+ for (int i =0; i < number; i++) {
+ Connection connection = factory.createConnection();
+ connection.start();
+ connections.add(connection);
+ GroupMap map = new GroupMap(connection,"map"+i);
+ map.setHeartBeatInterval(20000);
+ if(i ==number-1) {
+ map.setMinimumGroupSize(number);
+ }
+ map.start();
+ groupMaps.add(map);
+ }
+
+ int coordinator = 0;
+ for (GroupMap map:groupMaps) {
+ if (map.isCoordinator()) {
+ coordinator++;
+ }
+ }
+
+ assertEquals(1,coordinator);
+ groupMaps.get(0).put("key", "value");
+ Thread.sleep(2000);
+ for (GroupMap map:groupMaps) {
+ assertTrue(map.get("key").equals("value"));
+ }
+ for(GroupMap map:groupMaps) {
+ map.stop();
+ }
+ for (Connection connection:connections) {
+ connection.stop();
+ }
+
+ }
+
+
+
+ protected void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory()throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+ ActiveMQConnection.DEFAULT_BROKER_URL);
+ return cf;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ answer.setPersistent(false);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
+
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java
------------------------------------------------------------------------------
svn:eol-style = native