You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/01 16:32:05 UTC
svn commit: r722095 [1/2] - in /activemq/activemq-blaze/trunk/src:
main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/
main/java/org/apache/activeblaze/group/
main/java/org/apache/activeblaze/impl/processor/ main/java/org/apache...
Author: rajdavies
Date: Mon Dec 1 07:32:04 2008
New Revision: 722095
URL: http://svn.apache.org/viewvc?rev=722095&view=rev
Log:
Added cluster 'state'
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java
- copied, changed from r720544, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java (with props)
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java (with props)
Removed:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java
activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Mon Dec 1 07:32:04 2008
@@ -28,7 +28,9 @@
AtomicBoolean started = new AtomicBoolean();
public boolean init() throws Exception {
- return this.initialialized.compareAndSet(false, true);
+ boolean result = this.initialialized.compareAndSet(false, true);
+ this.initialialized.set(true);
+ return result;
}
@@ -36,7 +38,9 @@
if (isStarted()) {
stop();
}
- return this.initialialized.compareAndSet(true, false);
+ boolean result = this.initialialized.compareAndSet(true, false);
+ this.initialialized.set(false);
+ return result;
}
@@ -44,15 +48,16 @@
if (!this.initialialized.get()) {
init();
}
- return this.started.compareAndSet(false, true);
+ boolean result = this.started.compareAndSet(false, true);
+ this.started.set(true);
+ return result;
}
public boolean stop() throws Exception {
- if (!isInitialized()) {
- init();
- }
- return this.started.compareAndSet(true, false);
+ boolean result = this.started.compareAndSet(true, false);
+ this.started.set(false);
+ return result;
}
public boolean isStarted() {
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Mon Dec 1 07:32:04 2008
@@ -260,18 +260,4 @@
}
}
}
-
- /**
- * shutdown on gc
- *
- * @throws Throwable
- * @see java.lang.Object#finalize()
- */
- protected void finalize() throws Throwable {
- try {
- shutDown();
- } finally {
- super.finalize();
- }
- }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Mon Dec 1 07:32:04 2008
@@ -16,11 +16,6 @@
*/
package org.apache.activeblaze;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectOutputStream;
import java.security.Key;
import java.util.Collection;
import java.util.Collections;
@@ -28,7 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activeblaze.util.IOUtils;
import org.apache.activeblaze.wire.BlazeData;
import org.apache.activeblaze.wire.BoolType;
import org.apache.activeblaze.wire.BufferType;
@@ -43,9 +38,6 @@
import org.apache.activeblaze.wire.ShortType;
import org.apache.activeblaze.wire.StringType;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
-import org.apache.activemq.protobuf.BufferOutputStream;
-
/**
* A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>
@@ -161,38 +153,23 @@
* @return text the default text
* @throws Exception
*/
- public Object getObject() throws Exception {
- Object result = null;
+ public Object getObject() throws Exception {
Buffer buffer = getBuffer(DEFAULT_OBJECT_PAYLOAD);
- InputStream is = new BufferInputStream(buffer);
- DataInputStream dataIn = new DataInputStream(is);
- ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
- result = objIn.readObject();
- return result;
+ return IOUtils.getObject(buffer);
}
-
+
/**
* Utility method for setting a default <Code>Object</Code> payload
*
* @param payload
*/
-
public void setObject(Object payload) {
- BufferOutputStream bufferOut = new BufferOutputStream(1024);
- DataOutputStream dataOut = new DataOutputStream(bufferOut);
try {
- ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
- objOut.writeObject(payload);
- objOut.flush();
- objOut.reset();
- objOut.close();
- } catch (IOException e) {
+ put(DEFAULT_OBJECT_PAYLOAD, IOUtils.getBuffer(payload));
+ } catch (Exception e) {
throw new BlazeRuntimeException(e);
}
- put(DEFAULT_OBJECT_PAYLOAD, bufferOut.toBuffer());
}
-
-
/**
* Utility method used for when a BlazeMessage is only carrying a String
@@ -481,7 +458,7 @@
throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
}
}
-
+
/**
* Returns a Buffer with the specified name.
*
@@ -500,8 +477,6 @@
throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
}
}
-
-
/**
* Returns the value of the object with the specified name.
@@ -689,7 +664,7 @@
this.map.remove(name);
}
}
-
+
/**
* Sets a Buffer value with the specified name into the Map.
*
@@ -728,8 +703,6 @@
put(name, data);
}
-
-
/**
* Find out if the message contains a key This isn't recursive
*
@@ -866,10 +839,17 @@
copy.content = this.content;
}
+ /**
+ * @return the content data
+ */
public BlazeData getContent() {
return this.content;
}
+ /**
+ * Set the content data
+ * @param content
+ */
public void setContent(BlazeData content) {
this.content = content;
}
@@ -1109,6 +1089,4 @@
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
-
-
}
\ No newline at end of file
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java Mon Dec 1 07:32:04 2008
@@ -41,7 +41,7 @@
* @param l
* @throws Exception
*/
- public void addClusterChangedListener(ClusterChangedListener l) throws Exception;
+ public void addMasterChangedListener(MasterChangedListener l) throws Exception;
/**
* Remove a listener for cluster changes
@@ -49,7 +49,7 @@
* @param l
* @throws Exception
*/
- public void removeClusterChangedListener(ClusterChangedListener l) throws Exception;
+ public void removeMasterChangedListener(MasterChangedListener l) throws Exception;
/**
* @return the configuration
@@ -63,4 +63,9 @@
* @throws Exception
*/
public boolean waitForElection(int timeout) throws Exception;
+
+ /**
+ * @return a <Code>ClusterState</Code>
+ */
+ public ClusterState getState();
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Mon Dec 1 07:32:04 2008
@@ -21,10 +21,13 @@
import org.apache.activeblaze.group.Group;
import org.apache.activeblaze.group.Member;
import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.util.SendRequest;
import org.apache.activeblaze.wire.ElectionMessage;
import org.apache.activeblaze.wire.MessageType;
import org.apache.activeblaze.wire.PacketData;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,7 +38,8 @@
*/
public class BlazeClusterGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeClusterGroupChannel {
private static final Log LOG = LogFactory.getLog(BlazeClusterGroupChannelImpl.class);
- private ClusterGroup coordinatedGroup;
+ private ClusterGroup clusterGroup;
+ private ClusterState state;
/**
* Constructor
@@ -44,16 +48,61 @@
*/
public BlazeClusterGroupChannelImpl(String name) {
super(name);
+ this.state = new ClusterState(this);
+ }
+
+ /**
+ * @return
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getState()
+ */
+ public ClusterState getState() {
+ return this.state;
+ }
+
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if (result) {
+ this.clusterGroup.init();
+ this.state.init();
+ }
+ return result;
+ }
+
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (result) {
+ this.clusterGroup.shutDown();
+ this.state.shutDown();
+ }
+ return result;
+ }
+
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result) {
+ this.clusterGroup.start();
+ this.state.start();
+ }
+ return result;
+ }
+
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (result) {
+ this.clusterGroup.stop();
+ this.state.stop();
+ }
+ return result;
}
/**
* @param l
* @throws Exception
- * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#addClusterChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#addMasterChangedListener(org.apache.activeblaze.cluster.MasterChangedListener)
*/
- public void addClusterChangedListener(ClusterChangedListener l) throws Exception {
+ public void addMasterChangedListener(MasterChangedListener l) throws Exception {
init();
- this.coordinatedGroup.addClusterChangedListener(l);
+ this.clusterGroup.addMasterChangedListener(l);
}
/**
@@ -62,8 +111,10 @@
* @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getMaster()
*/
public Member getMaster() throws Exception {
- init();
- return this.coordinatedGroup.getMaster();
+ if (this.clusterGroup != null) {
+ return this.clusterGroup.getMaster();
+ }
+ return null;
}
/**
@@ -72,18 +123,21 @@
* @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#isMaster()
*/
public boolean isMaster() throws Exception {
- init();
- return this.coordinatedGroup.isMasterMatch();
+ if (this.clusterGroup != null) {
+ return this.clusterGroup.isMasterMatch();
+ }
+ return false;
}
/**
* @param l
* @throws Exception
- * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.MasterChangedListener)
*/
- public void removeClusterChangedListener(ClusterChangedListener l) throws Exception {
- init();
- this.coordinatedGroup.removeClusterChangedListener(l);
+ public void removeMasterChangedListener(MasterChangedListener l) throws Exception {
+ if (this.clusterGroup != null) {
+ this.clusterGroup.removeMasterChangedListener(l);
+ }
}
/**
@@ -93,7 +147,7 @@
public BlazeClusterGroupConfiguration getConfiguration() {
return (BlazeClusterGroupConfiguration) this.configuration;
}
-
+
/**
* @param timeout
* @return
@@ -102,11 +156,21 @@
*/
public boolean waitForElection(int timeout) throws Exception {
init();
- return this.coordinatedGroup.waitForElection(timeout);
+ return this.clusterGroup.waitForElection(timeout);
+ }
+
+ /**
+ * @return true if there is elections have finished
+ * @throws Exception
+ */
+ public boolean isElectionFinished() throws Exception {
+ init();
+ return this.clusterGroup.isElectionFinished();
}
protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
if (isStarted()) {
+ processRequest(correlationId, data);
MessageType type = MessageType.valueOf(data.getType());
if (type == MessageType.BLAZE_DATA) {
doProcessBlazeData(data);
@@ -114,6 +178,8 @@
doProcessMemberData(data);
} else if (type.equals(MessageType.ELECTION_MESSAGE)) {
doProcessElectionData(id, data);
+ } else if (type.equals(MessageType.STATE_DATA)) {
+ doProcessStateData(data);
} else {
LOG.error("Unknown message type " + data);
}
@@ -124,9 +190,22 @@
return new MemberImpl(getId(), getName(), getConfiguration().getMasterWeight(), uri);
}
+ public MemberImpl getLocalMember() {
+ synchronized (this.localMutex) {
+ MemberImpl local = super.getLocalMember();
+ long oldWeight = local.getMasterWeight();
+ local.getData().setMasterWeight(getConfiguration().getMasterWeight());
+ if (oldWeight != getConfiguration().getMasterWeight()) {
+ // weight changed - so do an election in case the master changed
+ this.clusterGroup.scheduleElection();
+ }
+ return local;
+ }
+ }
+
protected Group createGroup() {
- this.coordinatedGroup = new ClusterGroup(this);
- return this.coordinatedGroup;
+ this.clusterGroup = new ClusterGroup(this);
+ return this.clusterGroup;
}
protected void doProcessElectionData(String id, PacketData data) throws Exception {
@@ -137,4 +216,42 @@
ClusterGroup group = (ClusterGroup) getGroup();
group.processElectionMessage(electionMessage, id);
}
+
+ protected void doProcessStateData(PacketData data) throws Exception {
+ this.state.processStateData(data);
+ }
+
+ /**
+ * send Request
+ *
+ * @param member
+ * @param destination
+ * @param message
+ * @param timeout
+ * @return
+ * @throws Exception
+ */
+ protected Message<?> sendRequest(MemberImpl to, MessageType type, Message<?> message, int timeout) throws Exception {
+ Message<?> result = null;
+ if (to != null) {
+ SendRequest request = new SendRequest();
+ PacketData data = getPacketData(type, message);
+ data.setReliable(true);
+ data.setResponseRequired(false);
+ data.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(data);
+ packet.setTo(to.getAddress());
+ synchronized (this.messageRequests) {
+ this.messageRequests.put(data.getMessageId(), request);
+ }
+ this.unicast.downStream(packet);
+ data = (PacketData) request.get(timeout);
+ if (data != null) {
+ type = MessageType.valueOf(data.getType());
+ result = type.createMessage();
+ result.mergeFramed(data.getPayload());
+ }
+ }
+ return result;
+ }
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java Mon Dec 1 07:32:04 2008
@@ -24,22 +24,22 @@
*
*/
public class BlazeClusterGroupConfiguration extends BlazeGroupConfiguration{
- private long masterWeight = 0;
+ private int masterWeight = 0;
private int minimumGroupSize = 1;
- private long awaitGroupTimeout = getHeartBeatInterval()*2;
+ private int awaitGroupTimeout = Math.max(getHeartBeatInterval()*2,1000);
/**
* @return the coordinatorWeight
*/
- public long getMasterWeight() {
+ public int getMasterWeight() {
return this.masterWeight;
}
/**
* @param coordinatorWeight the coordinatorWeight to set
*/
- public void setMasterWeight(long coordinatorWeight) {
+ public void setMasterWeight(int coordinatorWeight) {
this.masterWeight = coordinatorWeight;
}
@@ -60,14 +60,14 @@
/**
* @return the awaitGroupTimeout
*/
- public long getAwaitGroupTimeout() {
+ public int getAwaitGroupTimeout() {
return this.awaitGroupTimeout;
}
/**
* @param awaitGroupTimeout the awaitGroupTimeout to set
*/
- public void setAwaitGroupTimeout(long awaitGroupTimeout) {
+ public void setAwaitGroupTimeout(int awaitGroupTimeout) {
this.awaitGroupTimeout = awaitGroupTimeout;
}
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java Mon Dec 1 07:32:04 2008
@@ -29,10 +29,8 @@
import org.apache.activeblaze.group.Group;
import org.apache.activeblaze.group.Member;
import org.apache.activeblaze.group.MemberImpl;
-import org.apache.activeblaze.util.AsyncGroupRequest;
import org.apache.activeblaze.wire.ElectionMessage;
import org.apache.activeblaze.wire.ElectionType;
-import org.apache.activeblaze.wire.MemberData;
import org.apache.activeblaze.wire.MessageType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,7 +45,7 @@
private final BlazeClusterGroupConfiguration configuration;
private ThreadPoolExecutor electionExecutor;
private MemberImpl master;
- private List<ClusterChangedListener> listeners = new CopyOnWriteArrayList<ClusterChangedListener>();
+ private List<MasterChangedListener> listeners = new CopyOnWriteArrayList<MasterChangedListener>();
final AtomicBoolean electionFinished = new AtomicBoolean(false);
private long startTime;
@@ -83,6 +81,7 @@
return thread;
}
});
+ election(null, true);
}
return result;
}
@@ -102,6 +101,12 @@
return result;
}
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ setMaster(null);
+ return result;
+ }
+
/**
* @return true if there is elections have finished
*/
@@ -112,7 +117,7 @@
void setElectionFinished(boolean flag) {
synchronized (this.electionFinished) {
this.electionFinished.set(flag);
- // this.electionFinished.notifyAll();
+ this.electionFinished.notifyAll();
}
}
@@ -122,19 +127,38 @@
* @param data
* @throws Exception
*/
- protected MemberImpl processMember(MemberData data) throws Exception {
- MemberImpl result = super.processMember(data);
- if (result != null || (!isElectionFinished() && !data.getId().equals(getLocalMember().getId()))) {
- election(result, true);
+ protected void processMemberStarted(MemberImpl member) throws Exception {
+ if (!member.equals(getLocalMember())) {
+ synchronized (this.electionFinished) {
+ this.electionFinished.set(false);
+ }
+ }
+ super.processMemberStarted(member);
+ if (!member.equals(getLocalMember())) {
+ election(member, true);
}
- return result;
}
protected void processMemberStopped(MemberImpl member) throws Exception {
+ synchronized (this.electionFinished) {
+ this.electionFinished.set(false);
+ }
super.processMemberStopped(member);
election(null, false);
}
+ void scheduleElection() {
+ try {
+ this.electionFinished.set(false);
+ broadcastHeartBeat(getLocalMember());
+ election(null, true);
+ } catch (Exception e) {
+ if (isStarted()) {
+ LOG.error("Election failed ", e);
+ }
+ }
+ }
+
void election(final Member member, final boolean memberStarted) throws Exception {
if (isStarted() && this.electionExecutor != null && !this.electionExecutor.isShutdown()) {
synchronized (this.electionFinished) {
@@ -147,8 +171,10 @@
List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
for (Runnable r : list) {
ElectionService es = (ElectionService) r;
- es.stop();
- this.electionExecutor.remove(es);
+ if (es != null) {
+ es.stop();
+ this.electionExecutor.remove(es);
+ }
}
}
ElectionService es = new ElectionService(this, member, memberStarted);
@@ -162,11 +188,12 @@
*/
protected boolean isMasterMatch() {
String masterId = this.master != null ? this.master.getId() : "";
- return this.channel.getId().equals(masterId);
+ boolean result = isStarted() && this.channel.getId().equals(masterId);
+ return result;
}
protected MemberImpl getMaster() {
- return this.master;
+ return isStarted() ? this.master : null;
}
protected void setMaster(MemberImpl member) {
@@ -177,7 +204,7 @@
}
}
- protected void addClusterChangedListener(ClusterChangedListener l) {
+ protected void addMasterChangedListener(MasterChangedListener l) {
this.listeners.add(l);
}
@@ -187,33 +214,20 @@
* @param l
* @throws Exception
*/
- protected void removeClusterChangedListener(ClusterChangedListener l) {
+ protected void removeMasterChangedListener(MasterChangedListener l) {
this.listeners.remove(l);
}
- protected void fireClusterChanged(MemberImpl newMaster) {
- for (ClusterChangedListener l : this.listeners) {
- l.ClusterChanged(newMaster);
- }
- }
-
- boolean callElection() throws Exception {
- List<MemberImpl> members = new ArrayList<MemberImpl>(this.members.values());
- List<MemberImpl> sorted = ClusterGroup.sortMemberList(members);
- AsyncGroupRequest request = new AsyncGroupRequest();
- boolean doCall = false;
- for (Member member : sorted) {
- if (this.channel.getId().equals(member.getId())) {
- doCall = true;
- } else if (doCall) {
- ElectionMessage msg = new ElectionMessage();
- msg.setMember(this.channel.getLocalMember().getData());
- msg.setElectionType(ElectionType.ELECTION);
- this.channel.broadcastMessage(request, msg.type(), msg);
+ protected void fireClusterChanged(final MemberImpl newMaster) {
+ if (isStarted()) {
+ for (final MasterChangedListener l : this.listeners) {
+ this.listenerService.execute(new Runnable() {
+ public void run() {
+ l.masterChanged(newMaster);
+ }
+ });
}
}
- boolean result = request.isSuccess(this.configuration.getHeartBeatInterval());
- return result;
}
void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception {
@@ -225,12 +239,15 @@
reply.setElectionType(ElectionType.ANSWER);
reply.setMember(this.channel.getLocalMember().getData());
this.channel.sendReply(from, msg.type(), reply, correlationId);
- election(null, false);
+ // election(null, false);
} else if (msg.getElectionType().equals(ElectionType.MASTER)) {
- setMaster(from);
- LOG.debug(getLocalMember() + " Master is " + from);
- setMaster(from);
- setElectionFinished(true);
+ if (isValidMaster(from)) {
+ setMaster(from);
+ setElectionFinished(true);
+ } else {
+ // bully a new election - not a valid master
+ scheduleElection();
+ }
}
}
}
@@ -266,10 +283,30 @@
return !isStopped() && this.electionFinished.get();
}
+ protected boolean isValidMaster(MemberImpl newMaster) {
+ boolean result = newMaster.equals(getLocalMember());
+ if (!result) {
+ List<MemberImpl> sorted = ClusterGroup.sortMemberList(new ArrayList<MemberImpl>(this.members.values()));
+ int ourOrder = sorted.indexOf(getLocalMember());
+ int masterOrder = sorted.indexOf(newMaster);
+ result = masterOrder >= ourOrder;
+ }
+ return result;
+ }
+
protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
Collections.sort(list, new Comparator<Member>() {
public int compare(Member m1, Member m2) {
- long result = m1.getCoordinatorWeight() - m2.getCoordinatorWeight();
+ if (m1 == m2) {
+ return 0;
+ }
+ if (m1 == null) {
+ return -1;
+ }
+ if (m2 == null) {
+ return 1;
+ }
+ int result = m1.getMasterWeight() - m2.getMasterWeight();
if (result == 0) {
result = m1.getId().compareTo(m2.getId());
}
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java Mon Dec 1 07:32:04 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.BlazeRuntimeException;
+
+/**
+ * Exception raised when updating Cluster State
+ *
+ */
+public class ClusterNotMasterException extends BlazeRuntimeException {
+ private static final long serialVersionUID = -5543167215616832680L;
+
+ /**
+ * Constructs a new exception with <code>null</code> as its detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause}.
+ */
+ public ClusterNotMasterException() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message. The cause is not initialized, and may subsequently
+ * be initialized by a call to {@link #initCause}.
+ *
+ * @param message
+ * the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
+ * method.
+ */
+ public ClusterNotMasterException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and cause.
+ * <p>
+ * Note that the detail message associated with <code>cause</code> is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message
+ * the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
+ * @param cause
+ * the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+ * value is permitted, and indicates that the cause is nonexistent or unknown.)
+ */
+ public ClusterNotMasterException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail message of
+ * <tt>(cause==null ? null : cause.toString())</tt> (which typically contains the class and detail message of
+ * <tt>cause</tt>). This constructor is useful for exceptions that are little more than wrappers for other
+ * throwables (for example, {@link java.security.PrivilegedActionException}).
+ *
+ * @param cause
+ * the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+ * value is permitted, and indicates that the cause is nonexistent or unknown.)
+ */
+ public ClusterNotMasterException(Throwable cause) {
+ super(cause);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java Mon Dec 1 07:32:04 2008
@@ -16,157 +16,914 @@
*/
package org.apache.activeblaze.cluster;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.util.IOUtils;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.StateData;
+import org.apache.activeblaze.wire.StateKeyData;
+import org.apache.activeblaze.wire.StateType;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* <P>
- * A <CODE>GroupState</CODE> is a distributed collaboration implementation that is
- * used to shared state and process messages amongst a distributed group of
- * other <CODE>Group</CODE> instances. Membership of a group is handled
+ * A <CODE>ClusterState</CODE> is a distributed collaboration implementation that is used to shared state and process
+ * messages amongst a distributed group of other <CODE>Group</CODE> instances. Membership of a group is handled
* automatically using discovery.
- * <P>
- * The underlying transport is JMS and there are some optimizations that occur
- * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
- * with any JMS implementation.
- *
- * <P>
- * Updates to the group shared map are controlled by a coordinator. The
- * coordinator is elected by the member with the lowest lexicographical id -
- * based on the bully algorithm [Silberschatz et al. 1993]
- * <P>
- * The {@link #selectCordinator(Collection<Member> members)} method may be
- * overridden to implement a custom mechanism for choosing how the coordinator
- * is elected for the map.
- * <P>
- * New <CODE>Group</CODE> instances have their state updated by the
- * coordinator, and coordinator failure is handled automatically within the
- * group.
- * <P>
- * All map updates are totally ordered through the coordinator, whilst read
- * operations happen locally.
- * <P>
- * A <CODE>Group</CODE> supports the concept of owner only updates(write
- * locks), shared updates, entry expiration times and removal on owner exit -
- * all of which are optional. In addition, you can grab and release locks for
- * values in the map, independently of who created them.
- * <P>
- * In addition, members of a group can broadcast messages and implement
- * request/response with other <CODE>Group</CODE> instances.
- *
- * <P>
- * @param <String>
- * @param <V>
*
*/
+public class ClusterState extends BaseService implements Map<String, Object>, MemberChangedListener {
+ final static Log LOG = LogFactory.getLog(ClusterState.class);
+ private boolean alwaysLock = true;
+ private boolean removeOwnedObjectsOnExit;
+ private boolean releaseLockOnExit = true;
+ private int timeToLive;
+ private int lockTimeToLive;
+ private int requestTimeout = 5000;
+ final BlazeClusterGroupChannelImpl channel;
+ private final List<ClusterStateChangedListener> clusterStateChangedListeners = new CopyOnWriteArrayList<ClusterStateChangedListener>();
+ private final Map<String, StateValue> localMap = new ConcurrentHashMap<String, StateValue>();
+ private ExecutorService stateChangedExecutor;
+ private ScheduledExecutorService expirationService;
-public class ClusterState<String,V> implements Map<String,V>{
-
- private final BlazeClusterGroupChannelImpl channel;
protected ClusterState(BlazeClusterGroupChannelImpl channel) {
- this.channel=channel;
+ this.channel = channel;
}
+
+ /**
+ * Test to see if we own the key
+ *
+ * @param key
+ * @return true if the owner of the key
+ */
+ public boolean isOwner(String key) {
+ boolean result = false;
+ StateValue value = this.localMap.get(key);
+ if (value != null) {
+ result = value.getKey().getOwner().equals(this.channel.getLocalMember());
+ }
+ return result;
+ }
+
+ /**
+ * Get the owner of a Key
+ *
+ * @param key
+ * @return the owner or null if the key doesnn't exist
+ */
+ public Member getOwner(String key) {
+ StateValue value = this.localMap.get(key);
+ if (value != null) {
+ return value.getKey().getOwner();
+ }
+ return null;
+ }
+
+ /**
+ * Unlock a key
+ *
+ * @param key
+ * @throws Exception
+ */
+ public void unlock(String key) throws Exception {
+ StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
+ stateKey.setLocked(false);
+ stateKey.setRemoveOnExit(isRemoveOwnedObjectsOnExit());
+ stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
+ stateKey.setTimeToLive(getTimeToLive());
+ StateData stateData = new StateData();
+ stateData.setKeyData(stateKey.getKeyData());
+ stateData.setLockWrite(true);
+ sendMasterRequest(stateData);
+ }
+
+ /**
+ * Lock a Key
+ *
+ * @param key
+ * @throws Exception
+ */
+ public void lock(String key) throws Exception {
+ lock(key, getLockTimeToLive());
+ }
+
+ /**
+ * Lock a Key
+ *
+ * @param key
+ * @param leaseTime
+ * @throws Exception
+ */
+ public void lock(String key, long leaseTime) throws Exception {
+ StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
+ stateKey.setLocked(true);
+ stateKey.setRemoveOnExit(isRemoveOwnedObjectsOnExit());
+ stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
+ stateKey.setTimeToLive(getTimeToLive());
+ stateKey.setLockLeaseTime(leaseTime);
+ StateData stateData = new StateData();
+ stateData.setKeyData(stateKey.getKeyData());
+ stateData.setLockWrite(true);
+ sendMasterRequest(stateData);
+ }
+
/**
*
* @see java.util.Map#clear()
*/
public void clear() {
- // TODO Auto-generated method stub
-
+ checkStatus();
+ if (this.localMap != null && !this.localMap.isEmpty()) {
+ Set<String> keys = null;
+ keys = new HashSet<String>(this.localMap.keySet());
+ for (String key : keys) {
+ remove(key);
+ }
+ }
+ this.localMap.clear();
}
+
/**
* @param key
* @return
* @see java.util.Map#containsKey(java.lang.Object)
*/
- public boolean containsKey(Object key) {
- // TODO Auto-generated method stub
- return false;
+ public boolean containsKey(java.lang.Object key) {
+ return this.localMap.containsKey(key);
}
+
/**
* @param value
* @return
* @see java.util.Map#containsValue(java.lang.Object)
*/
- public boolean containsValue(Object value) {
- // TODO Auto-generated method stub
- return false;
+ public boolean containsValue(java.lang.Object value) {
+ StateValue sv = new StateValue(null, value, null);
+ return this.localMap.containsValue(sv);
}
+
/**
* @return
* @see java.util.Map#entrySet()
*/
- public Set<java.util.Map.Entry<String, V>> entrySet() {
- // TODO Auto-generated method stub
- return null;
+ public Set<java.util.Map.Entry<String, Object>> entrySet() {
+ Map<String, Object> result = new HashMap<String, Object>();
+ try {
+ for (StateValue entry : this.localMap.values()) {
+ String key = entry.getKey().getKey();
+ Object value = entry.getValue();
+ result.put(key, value);
+ }
+ } catch (Exception e) {
+ throw new BlazeRuntimeException(e);
+ }
+ return result.entrySet();
}
+
/**
* @param key
* @return
* @see java.util.Map#get(java.lang.Object)
*/
- public V get(Object key) {
- // TODO Auto-generated method stub
- return null;
+ public Object get(java.lang.Object key) {
+ StateValue v = this.localMap.get(key);
+ try {
+ return v != null ? v.getValue() : null;
+ } catch (Exception e) {
+ throw new BlazeRuntimeException(e);
+ }
}
+
/**
* @return
* @see java.util.Map#isEmpty()
*/
public boolean isEmpty() {
- // TODO Auto-generated method stub
- return false;
+ return this.localMap.isEmpty();
}
+
/**
* @return
* @see java.util.Map#keySet()
*/
public Set<String> keySet() {
- // TODO Auto-generated method stub
- return null;
+ return this.localMap.keySet();
}
+
/**
* @param key
* @param value
* @return
* @see java.util.Map#put(java.lang.Object, java.lang.Object)
*/
- public V put(String key, V value) {
- // TODO Auto-generated method stub
- return null;
+ public Object put(String key, Object value) {
+ return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(),
+ getLockTimeToLive());
+ }
+
+ /**
+ * @param key
+ * @param value
+ * @param lock
+ * @param removeOnExit
+ * @param releaseLockOnExit
+ * @param timeToLive
+ * @param leaseTime
+ * @return
+ * @throws BlazeRuntimeException
+ */
+ public Object put(String key, Object value, boolean lock, boolean removeOnExit, boolean releaseLockOnExit,
+ long timeToLive, long leaseTime) throws BlazeRuntimeException {
+ checkStatus();
+ Object resultValue = null;
+ try {
+ StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
+ stateKey.setLocked(lock);
+ stateKey.setRemoveOnExit(removeOnExit);
+ stateKey.setReleaseLockOnExit(releaseLockOnExit);
+ stateKey.setTimeToLive(timeToLive);
+ stateKey.setLockLeaseTime(leaseTime);
+ StateData stateData = new StateData();
+ stateData.setKeyData(stateKey.getKeyData());
+ stateData.setStateType(StateType.INSERT);
+ stateData.setMapWrite(true);
+ stateData.setValue(IOUtils.getBuffer(value));
+ resultValue = sendMasterRequest(stateData);
+ } catch (Exception e) {
+ if (e instanceof ClusterUpdateException) {
+ throw (ClusterUpdateException) e;
+ }
+ throw new ClusterUpdateException(e);
+ }
+ return resultValue;
}
+
/**
* @param t
* @see java.util.Map#putAll(java.util.Map)
*/
- public void putAll(Map<? extends String, ? extends V> t) {
- // TODO Auto-generated method stub
-
+ public void putAll(Map<? extends String, ? extends Object> t) {
+ putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(),
+ getLockTimeToLive());
}
+
+ /**
+ * put all
+ *
+ * @param t
+ * @param lock
+ * @param removeOnExit
+ * @param releaseLockOnExit
+ * @param timeToLive
+ * @param lockTimeToLive
+ */
+ public void putAll(Map<? extends String, ? extends Object> t, boolean lock, boolean removeOnExit,
+ boolean releaseLockOnExit, long timeToLive, long lockTimeToLive) {
+ for (java.util.Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
+ put(entry.getKey(), entry.getValue(), lock, removeOnExit, releaseLockOnExit, timeToLive, lockTimeToLive);
+ }
+ }
+
/**
* @param key
* @return
* @see java.util.Map#remove(java.lang.Object)
*/
- public V remove(Object key) {
- // TODO Auto-generated method stub
- return null;
+ public Object remove(java.lang.Object key) {
+ checkStatus();
+ StateKey stateKey = new StateKey(this.channel.getLocalMember(), key.toString());
+ StateData stateData = new StateData();
+ stateData.setKeyData(stateKey.getKeyData());
+ stateData.setMapWrite(true);
+ stateData.setStateType(StateType.DELETE);
+ try {
+ return this.channel.sendRequest((MemberImpl) this.channel.getMaster(), stateData.type(), stateData,
+ getRequestTimeout());
+ } catch (Exception e) {
+ throw new BlazeRuntimeException(e);
+ }
}
+
/**
* @return
* @see java.util.Map#size()
*/
public int size() {
- // TODO Auto-generated method stub
- return 0;
+ return this.localMap.size();
}
+
/**
* @return
* @see java.util.Map#values()
*/
- public Collection<V> values() {
- // TODO Auto-generated method stub
- return null;
+ public Collection<Object> values() {
+ List<Object> result = new ArrayList<Object>();
+ try {
+ for (StateValue v : this.localMap.values()) {
+ result.add(v.getValue());
+ }
+ } catch (Exception e) {
+ throw new BlazeRuntimeException(e);
+ }
+ return result;
+ }
+
+ /**
+ * @return the alwaysLock
+ */
+ public boolean isAlwaysLock() {
+ return this.alwaysLock;
+ }
+
+ /**
+ * @param alwaysLock
+ * the alwaysLock to set
+ */
+ public void setAlwaysLock(boolean alwaysLock) {
+ this.alwaysLock = alwaysLock;
+ }
+
+ /**
+ * @return the removeOwnedObjectsOnExit
+ */
+ public boolean isRemoveOwnedObjectsOnExit() {
+ return this.removeOwnedObjectsOnExit;
+ }
+
+ /**
+ * @param removeOwnedObjectsOnExit
+ * the removeOwnedObjectsOnExit to set
+ */
+ public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
+ this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
+ }
+
+ /**
+ * @return the releaseLockOnExit
+ */
+ public boolean isReleaseLockOnExit() {
+ return this.releaseLockOnExit;
+ }
+
+ /**
+ * @param releaseLockOnExit
+ * the releaseLockOnExit to set
+ */
+ public void setReleaseLockOnExit(boolean releaseLockOnExit) {
+ this.releaseLockOnExit = releaseLockOnExit;
+ }
+
+ /**
+ * @return the timeToLive
+ */
+ public int getTimeToLive() {
+ return this.timeToLive;
+ }
+
+ /**
+ * @param timeToLive
+ * the timeToLive to set
+ */
+ public void setTimeToLive(int timeToLive) {
+ this.timeToLive = timeToLive;
+ }
+
+ /**
+ * @return the lockTimeToLive
+ */
+ public int getLockTimeToLive() {
+ return this.lockTimeToLive;
+ }
+
+ /**
+ * @param lockTimeToLive
+ * the lockTimeToLive to set
+ */
+ public void setLockTimeToLive(int lockTimeToLive) {
+ this.lockTimeToLive = lockTimeToLive;
+ }
+
+ /**
+ * @return the requestTimeout
+ */
+ public int getRequestTimeout() {
+ return this.requestTimeout;
+ }
+
+ /**
+ * @param requestTimeout
+ * the requestTimeout to set
+ */
+ public void setRequestTimeout(int requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if (result) {
+ this.channel.addMemberChangedListener(this);
+ }
+ return result;
+ }
+
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (result) {
+ this.channel.removeMemberChangedListener(this);
+ }
+ return result;
+ }
+
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result) {
+ this.stateChangedExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "Cluster State{" + ClusterState.this.channel.getLocalMember()
+ + "}");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ this.expirationService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ Runnable expiration = new Runnable() {
+ public void run() {
+ try {
+ expirationSweep();
+ } catch (Exception e) {
+ ClusterState.LOG.error("Failed to send heartbeat", e);
+ }
+ }
+ };
+ this.expirationService.scheduleAtFixedRate(expiration, 500, 500, TimeUnit.MILLISECONDS);
+ }
+ return result;
+ }
+
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (result) {
+ this.stateChangedExecutor.shutdown();
+ this.expirationService.shutdown();
+ }
+ return result;
+ }
+
+ /**
+ * Add a <Code>ClusterStateChangedListener</Code>
+ *
+ * @param l
+ */
+ public void addClusterStateChangedListener(ClusterStateChangedListener l) {
+ this.clusterStateChangedListeners.add(l);
+ }
+
+ /**
+ * Add a <Code>ClusterStateChangedListener</Code>
+ *
+ * @param l
+ */
+ public void removeClusterStateChangedListener(ClusterStateChangedListener l) {
+ this.clusterStateChangedListeners.remove(l);
+ }
+
+ /**
+ * Implementation of org.apache.activeblaze.group.MemberChangedListener for listening to membership changes
+ *
+ * @param member
+ * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
+ */
+ public void memberStarted(Member member) {
+ try {
+ if (this.channel.isMaster()) {
+ this.channel.waitForElection(0);
+ // even though we may no longer be the master - we
+ // was the master before the new node started - so
+ // we take responsibility for updating the new node
+ for (StateValue value : this.localMap.values()) {
+ StateData newStateData = value.getData().clone();
+ newStateData.setMapWrite(false);
+ newStateData.setMapUpdate(true);
+ newStateData.setStateType(StateType.SYNC);
+ broadcastStateUpdate(newStateData, "");
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to sync up new group member: " + member);
+ }
+ }
+
+ /**
+ * Implementation of org.apache.activeblaze.group.MemberChangedListener for listening to membership changes
+ *
+ * @param member
+ * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
+ */
+ public void memberStopped(Member member) {
+ if (isStarted()) {
+ // remove any values held by this member
+ List<StateKey> list = new ArrayList<StateKey>();
+ for (StateValue stateValue : this.localMap.values()) {
+ StateKey key = stateValue.getKey();
+ if (key.getOwner().equals(member)) {
+ if (key.isReleaseLockOnExit()) {
+ key.setLocked(false);
+ }
+ if (key.isRemoveOnExit()) {
+ list.add(key);
+ }
+ }
+ }
+ for (StateKey key : list) {
+ StateValue value = this.localMap.remove(key.getKey());
+ if (value != null) {
+ try {
+ fireMapChanged(member, key.getKey(), value.getValue(), null, false);
+ } catch (Exception e) {
+ LOG.warn("Failed to fireMapChanged for key " + key.getKey(), e);
+ }
+ }
+ }
+ }
+ }
+
+ protected void processStateData(PacketData data) throws Exception {
+ MessageType type = MessageType.STATE_DATA;
+ StateData stateData = (StateData) type.createMessage();
+ Buffer payload = data.getPayload();
+ stateData.mergeFramed(payload);
+ String correlationId = "";
+ if (data.hasMessageId()) {
+ correlationId = data.getMessageId().toStringUtf8();
+ }
+ if (!stateData.getError()) {
+ if (stateData.getMapUpdate()) {
+ processMapUpdate(stateData);
+ } else if (stateData.getLockUpdate()) {
+ processLockUpdate(stateData, correlationId);
+ } else if (stateData.getLockWrite()) {
+ processLockWrite(stateData, correlationId);
+ } else if (stateData.getMapWrite()) {
+ processMapOperations(stateData, correlationId);
+ } else {
+ LOG.error("Don't know how to process " + stateData);
+ }
+ }
+ }
+
+ protected void processLockWrite(StateData stateData, String correlationId) throws Exception {
+ if (this.channel.waitForElection(0)) {
+ // reset values for when we broadcast an update
+ stateData.setLockUpdate(true);
+ stateData.setLockWrite(false);
+ StateValue stateValue = new StateValue(stateData);
+ boolean newLock = stateValue.getKey().isLocked();
+ MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember());
+ long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
+ if (this.channel.isMaster()) {
+ StateKey originalKey = getKey(stateValue.getKey().getKey());
+ if (originalKey != null) {
+ if (originalKey.isLocked()) {
+ if (!originalKey.getOwner().equals(stateValue.getKey().getOwner())) {
+ StateValue stateReply = stateValue.copy();
+ Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
+ stateReply.getData().setError(true);
+ stateReply.getData().setValue(IOUtils.getBuffer(reply));
+ this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData()
+ .type(), stateReply.getData(), correlationId);
+ } else {
+ originalKey.setLocked(newLock);
+ originalKey.setOwner(newOwner);
+ originalKey.setLockExpiration(newLockExpiration);
+ broadcastStateUpdate(stateData, correlationId);
+ }
+ } else {
+ originalKey.setLocked(newLock);
+ originalKey.setOwner(newOwner);
+ originalKey.setLockExpiration(newLockExpiration);
+ broadcastStateUpdate(stateData, correlationId);
+ }
+ }
+ } else {
+ StateValue stateReply = stateValue.copy();
+ stateReply.getData().clearStateType();
+ Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
+ stateReply.getData().setError(true);
+ stateReply.getData().setValue(IOUtils.getBuffer(reply));
+ this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData().type(),
+ stateReply.getData(), correlationId);
+ }
+ }
+ }
+
+ protected void processLockUpdate(StateData stateData, String correlationId) throws Exception {
+ if (this.channel.waitForElection(0)) {
+ StateValue stateValue = new StateValue(stateData);
+ boolean newLock = stateValue.getKey().isLocked();
+ MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember());
+ long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
+ if (!this.channel.isMaster()) {
+ StateKey originalKey = getKey(stateValue.getKey().getKey());
+ if (originalKey != null) {
+ originalKey.setLocked(newLock);
+ originalKey.setOwner(newOwner);
+ originalKey.setLockExpiration(newLockExpiration);
+ }
+ }
+ }
+ }
+
+ protected void processMapOperations(StateData data, String correlationId) throws Exception {
+ StateValue stateValue = new StateValue(data);
+ StateKey key = stateValue.getKey();
+ StateType stateType = stateValue.getData().getStateType();
+ if (stateType != null) {
+ boolean insert = stateType.equals(StateType.INSERT) || stateType.equals(StateType.SYNC);
+ boolean containsKey = this.localMap.containsKey(key.getKey());
+ if (this.channel.waitForElection(0)) {
+ if (this.channel.isMaster()) {
+ if (containsKey) {
+ StateKey originalKey = getKey(key.getKey());
+ if (originalKey.getOwner().equals(key.getOwner()) || !originalKey.isLocked()) {
+ StateValue old = null;
+ if (insert) {
+ old = this.localMap.put(key.getKey(), stateValue);
+ } else {
+ old = this.localMap.remove(key.getKey());
+ }
+ StateData newStateData = data.clone();
+ newStateData.clearOldvalue();
+ if (old != null && old.getData().getValue() != null) {
+ newStateData.setOldvalue(old.getData().getValue());
+ }
+ newStateData.setMapWrite(false);
+ newStateData.setMapUpdate(true);
+ broadcastStateUpdate(newStateData, correlationId);
+ fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), false);
+ } else {
+ StateValue stateReply = stateValue.copy();
+ Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
+ stateReply.getData().setValue(IOUtils.getBuffer(reply));
+ stateReply.getData().setError(true);
+ this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData()
+ .type(), stateReply.getData(), correlationId);
+ }
+ } else {
+ if (insert) {
+ this.localMap.put(key.getKey(), stateValue);
+ StateData newStateData = data.clone();
+ newStateData.setMapWrite(false);
+ newStateData.setMapUpdate(true);
+ broadcastStateUpdate(newStateData, correlationId);
+ fireMapChanged(key.getOwner(), key.getKey(), null, stateValue.getValue(), false);
+ } else {
+ // this shouldn't happen - as we are trying to remove
+ // a non-existent key
+ LOG.warn("Cluster State in inconsistent state - master not aware of " + key.getKey()
+ + " from " + key.getOwner());
+ }
+ }
+ } else {
+ StateValue stateReply = stateValue.copy();
+ stateReply.getData().clearStateType();
+ Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
+ stateReply.getData().setError(true);
+ stateReply.getData().setValue(IOUtils.getBuffer(reply));
+ this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData().type(),
+ stateReply.getData(), correlationId);
+ }
+ }
+ }
+ }
+
+ protected void processMapUpdate(StateData data) throws Exception {
+ StateKeyData skd = data.getKeyData();
+ StateKey key = new StateKey(skd);
+ StateValue stateValue = new StateValue(data);
+ boolean containsKey = this.localMap.containsKey(key.getKey());
+ if (this.channel.waitForElection(0)) {
+ boolean insert = data.getStateType().equals(StateType.SYNC) || data.getStateType().equals(StateType.INSERT);
+ if (!this.channel.isMaster() || data.getStateType().equals(StateType.SYNC)) {
+ if (containsKey) {
+ if (key.isLockExpired()) {
+ StateValue old = this.localMap.get(key.getKey());
+ if (old != null) {
+ old.getKey().setLocked(false);
+ }
+ } else {
+ StateValue old = null;
+ if (insert) {
+ old = this.localMap.put(key.getKey(), stateValue);
+ } else {
+ old = this.localMap.remove(key.getKey());
+ StateData copy = data.clone();
+ copy.clearValue();
+ copy.clearOldvalue();
+ stateValue = new StateValue(copy);
+ }
+ fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), data
+ .getExpired());
+ }
+ } else {
+ if (insert) {
+ this.localMap.put(key.getKey(), stateValue);
+ fireMapChanged(key.getOwner(), key.getKey(), null, stateValue.getValue(), false);
+ }
+ }
+ }
+ }
+ }
+
+ protected void checkStatus() throws RuntimeException {
+ if (!isStarted()) {
+ throw new IllegalStateException("ClusterState " + this.channel.getName() + " not started");
+ }
+ try {
+ this.channel.waitForElection(0);
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new BlazeRuntimeException(e);
+ }
+ }
+
+ protected void expirationSweep() throws Exception {
+ if (this.channel.waitForElection(0)) {
+ if (this.channel.isMaster() && isStarted()) {
+ List<String> expiredMessages = null;
+ List<StateValue> expiredLocks = null;
+ long currentTime = System.currentTimeMillis();
+ for (StateValue value : this.localMap.values()) {
+ StateKey key = value.getKey();
+ if (key.isExpired(currentTime)) {
+ if (expiredMessages == null) {
+ expiredMessages = new ArrayList<String>();
+ }
+ expiredMessages.add(key.getKey());
+ } else if (key.isLockExpired(currentTime)) {
+ key.setLocked(false);
+ if (expiredLocks == null) {
+ expiredLocks = new ArrayList<StateValue>();
+ }
+ expiredLocks.add(value);
+ }
+ }
+ // do the actual removal of entries in a separate thread
+ if (expiredMessages != null) {
+ final List<String> expire = expiredMessages;
+ this.stateChangedExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ doMessageExpiration(expire);
+ } catch (Exception e) {
+ LOG.error("Message expiration failed", e);
+ }
+ }
+ });
+ }
+ if (expiredLocks != null) {
+ final List<StateValue> expire = expiredLocks;
+ this.stateChangedExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ doLockExpiration(expire);
+ } catch (Exception e) {
+ LOG.error("Lock expiration failed", e);
+ }
+ }
+ });
+ }
+ }
+ }
+ }
+
+ protected void doMessageExpiration(List<String> list) throws Exception {
+ if (isStarted() && this.channel.isElectionFinished() && this.channel.isMaster()) {
+ for (String k : list) {
+ StateValue old = this.localMap.remove(k);
+ if (old != null) {
+ StateValue value = old.copy();
+ value.getData().setStateType(StateType.DELETE);
+ value.getData().setExpired(true);
+ value.getData().clearMapWrite();
+ value.getData().setMapUpdate(true);
+ broadcastStateUpdate(value.getData(), "");
+ fireMapChanged(new MemberImpl(value.getData().getKeyData().getMember()), k, old.getValue(), null,
+ true);
+ }
+ }
+ }
+ }
+
+ protected StateKey getKey(String key) {
+ StateValue stateValue = this.localMap != null ? this.localMap.get(key) : null;
+ return stateValue != null ? stateValue.getKey() : null;
+ }
+
+ protected void doLockExpiration(List<StateValue> list) throws Exception {
+ if (isStarted() && this.channel.isElectionFinished() && this.channel.isMaster()) {
+ for (StateValue value : list) {
+ StateValue copy = value.copy();
+ copy.getData().setStateType(StateType.DELETE);
+ copy.getData().setLockExpired(true);
+ broadcastStateUpdate(copy.getData(), "");
+ }
+ }
+ }
+
+ protected void fireMapChanged(final Member owner, final String key, final Object oldValue, final Object newValue,
+ final boolean expired) {
+ if (isStarted() && this.stateChangedExecutor != null && !this.stateChangedExecutor.isShutdown()) {
+ this.stateChangedExecutor.execute(new Runnable() {
+ public void run() {
+ doFireMapChanged(owner, key, oldValue, newValue, expired);
+ }
+ });
+ }
+ }
+
+ protected void doFireMapChanged(Member owner, String key, Object oldValue, Object newValue, boolean expired) {
+ if (isStarted()) {
+ for (ClusterStateChangedListener l : this.clusterStateChangedListeners) {
+ if (oldValue == null) {
+ l.mapInsert(owner, key, newValue);
+ } else if (newValue == null) {
+ l.mapRemove(owner, key, oldValue, expired);
+ } else {
+ l.mapUpdate(owner, key, oldValue, newValue);
+ }
+ }
+ }
+ }
+
+ protected void broadcastStateUpdate(StateData value, String correlationId) {
+ if (isStarted()) {
+ try {
+ this.channel.broadcastMessage(value.type(), value, correlationId);
+ } catch (Exception e) {
+ if (isStarted()) {
+ LOG.error("Failed to send StateData " + value, e);
+ }
+ }
+ }
+ }
+
+ protected Object sendMasterRequest(StateData stateData) throws Exception {
+ int retryCount = 0;
+ MemberImpl master = null;
+ while (retryCount < 5) {
+ this.channel.waitForElection(0);
+ master = (MemberImpl) this.channel.getMaster();
+ StateData resultData = (StateData) this.channel.sendRequest(master, stateData.type(), stateData,
+ getRequestTimeout());
+ retryCount++;
+ if (resultData != null) {
+ Object resultValue = IOUtils.getObject(resultData.getValue());
+ if (resultValue instanceof ClusterNotMasterException) {
+ LOG.warn(this.getLocal().getName() + " Request sent to an old master " + master
+ + " - resending to new master: " + this.channel.getMaster());
+ Thread.sleep(1000);
+ continue;
+ }
+ if (resultValue instanceof ClusterUpdateException) {
+ throw (ClusterUpdateException) resultValue;
+ }
+ Object result = IOUtils.getObject(resultData.getOldvalue());
+ return result;
+ }
+ }
+ throw new ClusterUpdateException(getLocal() + " Request timed out to " + master);
+ }
+
+ protected MemberImpl getLocal() {
+ return this.channel.getLocalMember();
}
}
\ No newline at end of file
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java Mon Dec 1 07:32:04 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.group.Member;
+
+
+
+/**
+ *Get notifications about changes to the state of the map
+ *
+ */
+public interface ClusterStateChangedListener {
+
+ /**
+ * Called when a key/value pair is inserted into the map
+ * @param owner
+ * @param key
+ * @param value
+ */
+ void mapInsert(Member owner,String key, Object value);
+
+ /**
+ * Called when a key value is updated in the map
+ * @param owner
+ * @param Key
+ * @param oldValue
+ * @param newValue
+ */
+ void mapUpdate(Member owner,String Key,Object oldValue,Object newValue);
+
+ /**
+ * Called when a key value is removed from the map
+ * @param owner
+ * @param key
+ * @param value
+ * @param expired
+ */
+ void mapRemove(Member owner,String key, Object value,boolean expired);
+}
\ No newline at end of file
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java Mon Dec 1 07:32:04 2008
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.BlazeRuntimeException;
+
+/**
+ * Exception raised when updating Cluster State
+ *
+ */
+public class ClusterUpdateException extends BlazeRuntimeException{
+
+ private static final long serialVersionUID = 8778617962968095062L;
+
+ /**
+ * Constructs a new exception with <code>null</code> as its detail message.
+ * The cause is not initialized, and may subsequently be initialized by a
+ * call to {@link #initCause}.
+ */
+ public ClusterUpdateException() {
+ super();
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public ClusterUpdateException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * <code>cause</code> is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public ClusterUpdateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs a new exception with the specified cause and a detail
+ * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+ * typically contains the class and detail message of <tt>cause</tt>).
+ * This constructor is useful for exceptions that are little more than
+ * wrappers for other throwables (for example, {@link
+ * java.security.PrivilegedActionException}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public ClusterUpdateException(Throwable cause) {
+ super(cause);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java Mon Dec 1 07:32:04 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.group.Member;
+
+/**
+ * Default implementation of ClusterStateListener
+ *
+ */
+public class DefaultClusterStateListener implements ClusterStateChangedListener {
+ /**
+ * @param owner
+ * @param key
+ * @param value
+ * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapInsert(org.apache.activeblaze.group.Member,
+ * java.lang.String, java.lang.Object)
+ */
+ public void mapInsert(Member owner, String key, Object value) {
+ }
+
+ /**
+ * @param owner
+ * @param key
+ * @param value
+ * @param expired
+ * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapRemove(org.apache.activeblaze.group.Member,
+ * java.lang.String, java.lang.Object, boolean)
+ */
+ public void mapRemove(Member owner, String key, Object value, boolean expired) {
+ }
+
+ /**
+ * @param owner
+ * @param Key
+ * @param oldValue
+ * @param newValue
+ * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapUpdate(org.apache.activeblaze.group.Member,
+ * java.lang.String, java.lang.Object, java.lang.Object)
+ */
+ public void mapUpdate(Member owner, String Key, Object oldValue, Object newValue) {
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java
------------------------------------------------------------------------------
svn:eol-style = native