You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/01 16:32:05 UTC

svn commit: r722095 [1/2] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/processor/ main/java/org/apache...

Author: rajdavies
Date: Mon Dec  1 07:32:04 2008
New Revision: 722095

URL: http://svn.apache.org/viewvc?rev=722095&view=rev
Log:
Added cluster 'state'

Added:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/MasterChangedListener.java
      - copied, changed from r720544, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateKey.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/StateValue.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/DefaultMemberChangedListener.java   (with props)
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/BufferOutputStream.java   (with props)
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/ClusterStateTest.java   (with props)
Removed:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
Modified:
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Member.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/MemberImpl.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/ChainedProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/FragmentationProcessor.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/Packet.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketAudit.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/processor/PacketMessageType.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/BaseTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/MulticastTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/impl/transport/UdpTransport.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/AsyncGroupRequest.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IOUtils.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/IdGenerator.java
    activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/util/LRUCache.java
    activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
    activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/group/BlazeGroupChannelTest.java

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BaseService.java Mon Dec  1 07:32:04 2008
@@ -28,7 +28,9 @@
     AtomicBoolean started = new AtomicBoolean();
    
     public boolean init() throws Exception {
-        return this.initialialized.compareAndSet(false, true);
+        boolean result =  this.initialialized.compareAndSet(false, true);
+        this.initialialized.set(true);
+        return result;
     }
 
     
@@ -36,7 +38,9 @@
         if (isStarted()) {
             stop();
         }
-        return this.initialialized.compareAndSet(true, false);
+        boolean result =  this.initialialized.compareAndSet(true, false);
+        this.initialialized.set(false);
+        return result;
     }
 
     
@@ -44,15 +48,16 @@
         if (!this.initialialized.get()) {
             init();
         }
-        return this.started.compareAndSet(false, true); 
+        boolean result =  this.started.compareAndSet(false, true); 
+        this.started.set(true);
+        return result;
     }
 
     
     public boolean stop() throws Exception {
-        if (!isInitialized()) {
-            init();
-        }
-        return this.started.compareAndSet(true, false);
+        boolean result = this.started.compareAndSet(true, false);
+        this.started.set(false);
+        return result;
     }
     
     public boolean isStarted() {

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Mon Dec  1 07:32:04 2008
@@ -260,18 +260,4 @@
             }
         }
     }
-
-    /**
-     * shutdown on gc
-     * 
-     * @throws Throwable
-     * @see java.lang.Object#finalize()
-     */
-    protected void finalize() throws Throwable {
-        try {
-            shutDown();
-        } finally {
-            super.finalize();
-        }
-    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeMessage.java Mon Dec  1 07:32:04 2008
@@ -16,11 +16,6 @@
  */
 package org.apache.activeblaze;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectOutputStream;
 import java.security.Key;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,7 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.activeblaze.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activeblaze.util.IOUtils;
 import org.apache.activeblaze.wire.BlazeData;
 import org.apache.activeblaze.wire.BoolType;
 import org.apache.activeblaze.wire.BufferType;
@@ -43,9 +38,6 @@
 import org.apache.activeblaze.wire.ShortType;
 import org.apache.activeblaze.wire.StringType;
 import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.protobuf.BufferInputStream;
-import org.apache.activemq.protobuf.BufferOutputStream;
-
 
 /**
  * A <CODE>BlazeMessage</CODE> object is used to send a set of name-value pairs. The names are <CODE>String</CODE>
@@ -161,38 +153,23 @@
      * @return text the default text
      * @throws Exception
      */
-    public Object getObject()  throws Exception {
-        Object result = null;
+    public Object getObject() throws Exception {
         Buffer buffer = getBuffer(DEFAULT_OBJECT_PAYLOAD);
-        InputStream is  = new BufferInputStream(buffer);
-        DataInputStream dataIn = new DataInputStream(is);
-        ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
-        result = objIn.readObject();
-        return result;
+        return IOUtils.getObject(buffer);
     }
-    
+
     /**
      * Utility method for setting a default <Code>Object</Code> payload
      * 
      * @param payload
      */
-   
     public void setObject(Object payload) {
-        BufferOutputStream bufferOut = new BufferOutputStream(1024);
-        DataOutputStream dataOut = new DataOutputStream(bufferOut);
         try {
-            ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
-            objOut.writeObject(payload);
-            objOut.flush();
-            objOut.reset();
-            objOut.close();
-        } catch (IOException e) {
+            put(DEFAULT_OBJECT_PAYLOAD, IOUtils.getBuffer(payload));
+        } catch (Exception e) {
             throw new BlazeRuntimeException(e);
         }
-        put(DEFAULT_OBJECT_PAYLOAD, bufferOut.toBuffer());
     }
-    
-    
 
     /**
      * Utility method used for when a BlazeMessage is only carrying a String
@@ -481,7 +458,7 @@
             throw new BlazeMessageFormatException(" cannot read a byte[] from " + value.getClass().getName());
         }
     }
-    
+
     /**
      * Returns a Buffer with the specified name.
      * 
@@ -500,8 +477,6 @@
             throw new BlazeMessageFormatException(" cannot read a Buffer from " + value.getClass().getName());
         }
     }
-    
-    
 
     /**
      * Returns the value of the object with the specified name.
@@ -689,7 +664,7 @@
             this.map.remove(name);
         }
     }
-    
+
     /**
      * Sets a Buffer value with the specified name into the Map.
      * 
@@ -728,8 +703,6 @@
         put(name, data);
     }
 
-   
-
     /**
      * Find out if the message contains a key This isn't recursive
      * 
@@ -866,10 +839,17 @@
         copy.content = this.content;
     }
 
+    /**
+     * @return the content data
+     */
     public BlazeData getContent() {
         return this.content;
     }
 
+    /**
+     * Set the content data
+     * @param content
+     */
     public void setContent(BlazeData content) {
         this.content = content;
     }
@@ -1109,6 +1089,4 @@
     public void setTimeStamp(long timeStamp) {
         this.timeStamp = timeStamp;
     }
-    
-    
 }
\ No newline at end of file

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java Mon Dec  1 07:32:04 2008
@@ -41,7 +41,7 @@
      * @param l
      * @throws Exception 
      */
-    public void addClusterChangedListener(ClusterChangedListener l) throws Exception;
+    public void addMasterChangedListener(MasterChangedListener l) throws Exception;
 
     /**
      * Remove a listener for cluster changes
@@ -49,7 +49,7 @@
      * @param l
      * @throws Exception 
      */
-    public void removeClusterChangedListener(ClusterChangedListener l) throws Exception;
+    public void removeMasterChangedListener(MasterChangedListener l) throws Exception;
     
     /**
      * @return the configuration
@@ -63,4 +63,9 @@
      * @throws Exception
      */
     public boolean waitForElection(int timeout) throws Exception;
+    
+    /**
+     * @return a <Code>ClusterState</Code>
+     */
+    public ClusterState getState();
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Mon Dec  1 07:32:04 2008
@@ -21,10 +21,13 @@
 import org.apache.activeblaze.group.Group;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.util.SendRequest;
 import org.apache.activeblaze.wire.ElectionMessage;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.activeblaze.wire.PacketData;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -35,7 +38,8 @@
  */
 public class BlazeClusterGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeClusterGroupChannel {
     private static final Log LOG = LogFactory.getLog(BlazeClusterGroupChannelImpl.class);
-    private ClusterGroup coordinatedGroup;
+    private ClusterGroup clusterGroup;
+    private ClusterState state;
 
     /**
      * Constructor
@@ -44,16 +48,61 @@
      */
     public BlazeClusterGroupChannelImpl(String name) {
         super(name);
+        this.state = new ClusterState(this);
+    }
+
+    /**
+     * @return
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getState()
+     */
+    public ClusterState getState() {
+        return this.state;
+    }
+
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.clusterGroup.init();
+            this.state.init();
+        }
+        return result;
+    }
+
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+            this.clusterGroup.shutDown();
+            this.state.shutDown();
+        }
+        return result;
+    }
+
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result) {
+            this.clusterGroup.start();
+            this.state.start();
+        }
+        return result;
+    }
+
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (result) {
+            this.clusterGroup.stop();
+            this.state.stop();
+        }
+        return result;
     }
 
     /**
      * @param l
      * @throws Exception
-     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#addClusterChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#addMasterChangedListener(org.apache.activeblaze.cluster.MasterChangedListener)
      */
-    public void addClusterChangedListener(ClusterChangedListener l) throws Exception {
+    public void addMasterChangedListener(MasterChangedListener l) throws Exception {
         init();
-        this.coordinatedGroup.addClusterChangedListener(l);
+        this.clusterGroup.addMasterChangedListener(l);
     }
 
     /**
@@ -62,8 +111,10 @@
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getMaster()
      */
     public Member getMaster() throws Exception {
-        init();
-        return this.coordinatedGroup.getMaster();
+        if (this.clusterGroup != null) {
+            return this.clusterGroup.getMaster();
+        }
+        return null;
     }
 
     /**
@@ -72,18 +123,21 @@
      * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#isMaster()
      */
     public boolean isMaster() throws Exception {
-        init();
-        return this.coordinatedGroup.isMasterMatch();
+        if (this.clusterGroup != null) {
+            return this.clusterGroup.isMasterMatch();
+        }
+        return false;
     }
 
     /**
      * @param l
      * @throws Exception
-     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
+     * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.MasterChangedListener)
      */
-    public void removeClusterChangedListener(ClusterChangedListener l) throws Exception {
-        init();
-        this.coordinatedGroup.removeClusterChangedListener(l);
+    public void removeMasterChangedListener(MasterChangedListener l) throws Exception {
+        if (this.clusterGroup != null) {
+            this.clusterGroup.removeMasterChangedListener(l);
+        }
     }
 
     /**
@@ -93,7 +147,7 @@
     public BlazeClusterGroupConfiguration getConfiguration() {
         return (BlazeClusterGroupConfiguration) this.configuration;
     }
-    
+
     /**
      * @param timeout
      * @return
@@ -102,11 +156,21 @@
      */
     public boolean waitForElection(int timeout) throws Exception {
         init();
-        return this.coordinatedGroup.waitForElection(timeout);
+        return this.clusterGroup.waitForElection(timeout);
+    }
+
+    /**
+     * @return true if there is elections have finished
+     * @throws Exception
+     */
+    public boolean isElectionFinished() throws Exception {
+        init();
+        return this.clusterGroup.isElectionFinished();
     }
 
     protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
         if (isStarted()) {
+            processRequest(correlationId, data);
             MessageType type = MessageType.valueOf(data.getType());
             if (type == MessageType.BLAZE_DATA) {
                 doProcessBlazeData(data);
@@ -114,6 +178,8 @@
                 doProcessMemberData(data);
             } else if (type.equals(MessageType.ELECTION_MESSAGE)) {
                 doProcessElectionData(id, data);
+            } else if (type.equals(MessageType.STATE_DATA)) {
+                doProcessStateData(data);
             } else {
                 LOG.error("Unknown message type " + data);
             }
@@ -124,9 +190,22 @@
         return new MemberImpl(getId(), getName(), getConfiguration().getMasterWeight(), uri);
     }
 
+    public MemberImpl getLocalMember() {
+        synchronized (this.localMutex) {
+            MemberImpl local = super.getLocalMember();
+            long oldWeight = local.getMasterWeight();
+            local.getData().setMasterWeight(getConfiguration().getMasterWeight());
+            if (oldWeight != getConfiguration().getMasterWeight()) {
+                // weight changed - so do an election in case the master changed
+                this.clusterGroup.scheduleElection();
+            }
+            return local;
+        }
+    }
+
     protected Group createGroup() {
-        this.coordinatedGroup = new ClusterGroup(this);
-        return this.coordinatedGroup;
+        this.clusterGroup = new ClusterGroup(this);
+        return this.clusterGroup;
     }
 
     protected void doProcessElectionData(String id, PacketData data) throws Exception {
@@ -137,4 +216,42 @@
         ClusterGroup group = (ClusterGroup) getGroup();
         group.processElectionMessage(electionMessage, id);
     }
+
+    protected void doProcessStateData(PacketData data) throws Exception {
+        this.state.processStateData(data);
+    }
+
+    /**
+     * send Request
+     * 
+     * @param member
+     * @param destination
+     * @param message
+     * @param timeout
+     * @return
+     * @throws Exception
+     */
+    protected Message<?> sendRequest(MemberImpl to, MessageType type, Message<?> message, int timeout) throws Exception {
+        Message<?> result = null;
+        if (to != null) {
+            SendRequest request = new SendRequest();
+            PacketData data = getPacketData(type, message);
+            data.setReliable(true);
+            data.setResponseRequired(false);
+            data.setFromAddress(this.inboxURI);
+            Packet packet = new Packet(data);
+            packet.setTo(to.getAddress());
+            synchronized (this.messageRequests) {
+                this.messageRequests.put(data.getMessageId(), request);
+            }
+            this.unicast.downStream(packet);
+            data = (PacketData) request.get(timeout);
+            if (data != null) {
+                type = MessageType.valueOf(data.getType());
+                result = type.createMessage();
+                result.mergeFramed(data.getPayload());
+            }
+        }
+        return result;
+    }
 }

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java Mon Dec  1 07:32:04 2008
@@ -24,22 +24,22 @@
  *
  */
 public class BlazeClusterGroupConfiguration extends BlazeGroupConfiguration{
-    private long masterWeight = 0;
+    private int masterWeight = 0;
     private int minimumGroupSize = 1;
-    private long  awaitGroupTimeout = getHeartBeatInterval()*2;
+    private int  awaitGroupTimeout = Math.max(getHeartBeatInterval()*2,1000);
     
     
     /**
      * @return the coordinatorWeight
      */
-    public long getMasterWeight() {
+    public int getMasterWeight() {
         return this.masterWeight;
     }
 
     /**
      * @param coordinatorWeight the coordinatorWeight to set
      */
-    public void setMasterWeight(long coordinatorWeight) {
+    public void setMasterWeight(int coordinatorWeight) {
         this.masterWeight = coordinatorWeight;
     }
 
@@ -60,14 +60,14 @@
     /**
      * @return the awaitGroupTimeout
      */
-    public long getAwaitGroupTimeout() {
+    public int getAwaitGroupTimeout() {
         return this.awaitGroupTimeout;
     }
 
     /**
      * @param awaitGroupTimeout the awaitGroupTimeout to set
      */
-    public void setAwaitGroupTimeout(long awaitGroupTimeout) {
+    public void setAwaitGroupTimeout(int awaitGroupTimeout) {
         this.awaitGroupTimeout = awaitGroupTimeout;
     }
     

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java Mon Dec  1 07:32:04 2008
@@ -29,10 +29,8 @@
 import org.apache.activeblaze.group.Group;
 import org.apache.activeblaze.group.Member;
 import org.apache.activeblaze.group.MemberImpl;
-import org.apache.activeblaze.util.AsyncGroupRequest;
 import org.apache.activeblaze.wire.ElectionMessage;
 import org.apache.activeblaze.wire.ElectionType;
-import org.apache.activeblaze.wire.MemberData;
 import org.apache.activeblaze.wire.MessageType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,7 +45,7 @@
     private final BlazeClusterGroupConfiguration configuration;
     private ThreadPoolExecutor electionExecutor;
     private MemberImpl master;
-    private List<ClusterChangedListener> listeners = new CopyOnWriteArrayList<ClusterChangedListener>();
+    private List<MasterChangedListener> listeners = new CopyOnWriteArrayList<MasterChangedListener>();
     final AtomicBoolean electionFinished = new AtomicBoolean(false);
     private long startTime;
 
@@ -83,6 +81,7 @@
                             return thread;
                         }
                     });
+            election(null, true);
         }
         return result;
     }
@@ -102,6 +101,12 @@
         return result;
     }
 
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        setMaster(null);
+        return result;
+    }
+
     /**
      * @return true if there is elections have finished
      */
@@ -112,7 +117,7 @@
     void setElectionFinished(boolean flag) {
         synchronized (this.electionFinished) {
             this.electionFinished.set(flag);
-            // this.electionFinished.notifyAll();
+            this.electionFinished.notifyAll();
         }
     }
 
@@ -122,19 +127,38 @@
      * @param data
      * @throws Exception
      */
-    protected MemberImpl processMember(MemberData data) throws Exception {
-        MemberImpl result = super.processMember(data);
-        if (result != null || (!isElectionFinished() && !data.getId().equals(getLocalMember().getId()))) {
-            election(result, true);
+    protected void processMemberStarted(MemberImpl member) throws Exception {
+        if (!member.equals(getLocalMember())) {
+            synchronized (this.electionFinished) {
+                this.electionFinished.set(false);
+            }
+        }
+        super.processMemberStarted(member);
+        if (!member.equals(getLocalMember())) {
+            election(member, true);
         }
-        return result;
     }
 
     protected void processMemberStopped(MemberImpl member) throws Exception {
+        synchronized (this.electionFinished) {
+            this.electionFinished.set(false);
+        }
         super.processMemberStopped(member);
         election(null, false);
     }
 
+    void scheduleElection() {
+        try {
+            this.electionFinished.set(false);
+            broadcastHeartBeat(getLocalMember());
+            election(null, true);
+        } catch (Exception e) {
+            if (isStarted()) {
+                LOG.error("Election failed ", e);
+            }
+        }
+    }
+
     void election(final Member member, final boolean memberStarted) throws Exception {
         if (isStarted() && this.electionExecutor != null && !this.electionExecutor.isShutdown()) {
             synchronized (this.electionFinished) {
@@ -147,8 +171,10 @@
                     List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
                     for (Runnable r : list) {
                         ElectionService es = (ElectionService) r;
-                        es.stop();
-                        this.electionExecutor.remove(es);
+                        if (es != null) {
+                            es.stop();
+                            this.electionExecutor.remove(es);
+                        }
                     }
                 }
             ElectionService es = new ElectionService(this, member, memberStarted);
@@ -162,11 +188,12 @@
      */
     protected boolean isMasterMatch() {
         String masterId = this.master != null ? this.master.getId() : "";
-        return this.channel.getId().equals(masterId);
+        boolean result = isStarted() && this.channel.getId().equals(masterId);
+        return result;
     }
 
     protected MemberImpl getMaster() {
-        return this.master;
+        return isStarted() ? this.master : null;
     }
 
     protected void setMaster(MemberImpl member) {
@@ -177,7 +204,7 @@
         }
     }
 
-    protected void addClusterChangedListener(ClusterChangedListener l) {
+    protected void addMasterChangedListener(MasterChangedListener l) {
         this.listeners.add(l);
     }
 
@@ -187,33 +214,20 @@
      * @param l
      * @throws Exception
      */
-    protected void removeClusterChangedListener(ClusterChangedListener l) {
+    protected void removeMasterChangedListener(MasterChangedListener l) {
         this.listeners.remove(l);
     }
 
-    protected void fireClusterChanged(MemberImpl newMaster) {
-        for (ClusterChangedListener l : this.listeners) {
-            l.ClusterChanged(newMaster);
-        }
-    }
-
-    boolean callElection() throws Exception {
-        List<MemberImpl> members = new ArrayList<MemberImpl>(this.members.values());
-        List<MemberImpl> sorted = ClusterGroup.sortMemberList(members);
-        AsyncGroupRequest request = new AsyncGroupRequest();
-        boolean doCall = false;
-        for (Member member : sorted) {
-            if (this.channel.getId().equals(member.getId())) {
-                doCall = true;
-            } else if (doCall) {
-                ElectionMessage msg = new ElectionMessage();
-                msg.setMember(this.channel.getLocalMember().getData());
-                msg.setElectionType(ElectionType.ELECTION);
-                this.channel.broadcastMessage(request, msg.type(), msg);
+    protected void fireClusterChanged(final MemberImpl newMaster) {
+        if (isStarted()) {
+            for (final MasterChangedListener l : this.listeners) {
+                this.listenerService.execute(new Runnable() {
+                    public void run() {
+                        l.masterChanged(newMaster);
+                    }
+                });
             }
         }
-        boolean result = request.isSuccess(this.configuration.getHeartBeatInterval());
-        return result;
     }
 
     void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception {
@@ -225,12 +239,15 @@
                 reply.setElectionType(ElectionType.ANSWER);
                 reply.setMember(this.channel.getLocalMember().getData());
                 this.channel.sendReply(from, msg.type(), reply, correlationId);
-                election(null, false);
+                // election(null, false);
             } else if (msg.getElectionType().equals(ElectionType.MASTER)) {
-                setMaster(from);
-                LOG.debug(getLocalMember() + " Master is " + from);
-                setMaster(from);
-                setElectionFinished(true);
+                if (isValidMaster(from)) {
+                    setMaster(from);
+                    setElectionFinished(true);
+                } else {
+                    // bully a new election - not a valid master
+                    scheduleElection();
+                }
             }
         }
     }
@@ -266,10 +283,30 @@
         return !isStopped() && this.electionFinished.get();
     }
 
+    protected boolean isValidMaster(MemberImpl newMaster) {
+        boolean result = newMaster.equals(getLocalMember());
+        if (!result) {
+            List<MemberImpl> sorted = ClusterGroup.sortMemberList(new ArrayList<MemberImpl>(this.members.values()));
+            int ourOrder = sorted.indexOf(getLocalMember());
+            int masterOrder = sorted.indexOf(newMaster);
+            result = masterOrder >= ourOrder;
+        }
+        return result;
+    }
+
     protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
         Collections.sort(list, new Comparator<Member>() {
             public int compare(Member m1, Member m2) {
-                long result = m1.getCoordinatorWeight() - m2.getCoordinatorWeight();
+                if (m1 == m2) {
+                    return 0;
+                }
+                if (m1 == null) {
+                    return -1;
+                }
+                if (m2 == null) {
+                    return 1;
+                }
+                int result = m1.getMasterWeight() - m2.getMasterWeight();
                 if (result == 0) {
                     result = m1.getId().compareTo(m2.getId());
                 }

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.BlazeRuntimeException;
+
+/**
+ * Exception raised when updating Cluster State
+ * 
+ */
+public class ClusterNotMasterException extends BlazeRuntimeException {
+    private static final long serialVersionUID = -5543167215616832680L;
+
+    /**
+     * Constructs a new exception with <code>null</code> as its detail message. The cause is not initialized, and may
+     * subsequently be initialized by a call to {@link #initCause}.
+     */
+    public ClusterNotMasterException() {
+        super();
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message. The cause is not initialized, and may subsequently
+     * be initialized by a call to {@link #initCause}.
+     * 
+     * @param message
+     *            the detail message. The detail message is saved for later retrieval by the {@link #getMessage()}
+     *            method.
+     */
+    public ClusterNotMasterException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and cause.
+     * <p>
+     * Note that the detail message associated with <code>cause</code> is <i>not</i> automatically incorporated in
+     * this exception's detail message.
+     * 
+     * @param message
+     *            the detail message (which is saved for later retrieval by the {@link #getMessage()} method).
+     * @param cause
+     *            the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+     *            value is permitted, and indicates that the cause is nonexistent or unknown.)
+     */
+    public ClusterNotMasterException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause and a detail message of
+     * <tt>(cause==null ? null : cause.toString())</tt> (which typically contains the class and detail message of
+     * <tt>cause</tt>). This constructor is useful for exceptions that are little more than wrappers for other
+     * throwables (for example, {@link java.security.PrivilegedActionException}).
+     * 
+     * @param cause
+     *            the cause (which is saved for later retrieval by the {@link #getCause()} method). (A <tt>null</tt>
+     *            value is permitted, and indicates that the cause is nonexistent or unknown.)
+     */
+    public ClusterNotMasterException(Throwable cause) {
+        super(cause);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterNotMasterException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java?rev=722095&r1=722094&r2=722095&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java Mon Dec  1 07:32:04 2008
@@ -16,157 +16,914 @@
  */
 package org.apache.activeblaze.cluster;
 
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.util.IOUtils;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activeblaze.wire.StateData;
+import org.apache.activeblaze.wire.StateKeyData;
+import org.apache.activeblaze.wire.StateType;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * <P>
- * A <CODE>GroupState</CODE> is a distributed collaboration implementation that is
- * used to shared state and process messages amongst a distributed group of
- * other <CODE>Group</CODE> instances. Membership of a group is handled
+ * A <CODE>ClusterState</CODE> is a distributed collaboration implementation that is used to shared state and process
+ * messages amongst a distributed group of other <CODE>Group</CODE> instances. Membership of a group is handled
  * automatically using discovery.
- * <P>
- * The underlying transport is JMS and there are some optimizations that occur
- * for membership if used with ActiveMQ - but <CODE>Group</CODE> can be used
- * with any JMS implementation.
- * 
- * <P>
- * Updates to the group shared map are controlled by a coordinator. The
- * coordinator is elected by the member with the lowest lexicographical id -
- * based on the bully algorithm [Silberschatz et al. 1993]
- * <P>
- * The {@link #selectCordinator(Collection<Member> members)} method may be
- * overridden to implement a custom mechanism for choosing how the coordinator
- * is elected for the map.
- * <P>
- * New <CODE>Group</CODE> instances have their state updated by the
- * coordinator, and coordinator failure is handled automatically within the
- * group.
- * <P>
- * All map updates are totally ordered through the coordinator, whilst read
- * operations happen locally.
- * <P>
- * A <CODE>Group</CODE> supports the concept of owner only updates(write
- * locks), shared updates, entry expiration times and removal on owner exit -
- * all of which are optional. In addition, you can grab and release locks for
- * values in the map, independently of who created them.
- * <P>
- * In addition, members of a group can broadcast messages and implement
- * request/response with other <CODE>Group</CODE> instances.
- * 
- * <P>
- * @param <String> 
- * @param <V> 
  * 
  */
+public class ClusterState extends BaseService implements Map<String, Object>, MemberChangedListener {
+    final static Log LOG = LogFactory.getLog(ClusterState.class);
+    private boolean alwaysLock = true;
+    private boolean removeOwnedObjectsOnExit;
+    private boolean releaseLockOnExit = true;
+    private int timeToLive;
+    private int lockTimeToLive;
+    private int requestTimeout = 5000;
+    final BlazeClusterGroupChannelImpl channel;
+    private final List<ClusterStateChangedListener> clusterStateChangedListeners = new CopyOnWriteArrayList<ClusterStateChangedListener>();
+    private final Map<String, StateValue> localMap = new ConcurrentHashMap<String, StateValue>();
+    private ExecutorService stateChangedExecutor;
+    private ScheduledExecutorService expirationService;
 
-public class  ClusterState<String,V> implements Map<String,V>{
-    
-    private final BlazeClusterGroupChannelImpl channel;
     protected ClusterState(BlazeClusterGroupChannelImpl channel) {
-        this.channel=channel;
+        this.channel = channel;
     }
+
+    /**
+     * Test to see if we own the key
+     * 
+     * @param key
+     * @return true if the owner of the key
+     */
+    public boolean isOwner(String key) {
+        boolean result = false;
+        StateValue value = this.localMap.get(key);
+        if (value != null) {
+            result = value.getKey().getOwner().equals(this.channel.getLocalMember());
+        }
+        return result;
+    }
+
+    /**
+     * Get the owner of a Key
+     * 
+     * @param key
+     * @return the owner or null if the key doesnn't exist
+     */
+    public Member getOwner(String key) {
+        StateValue value = this.localMap.get(key);
+        if (value != null) {
+            return value.getKey().getOwner();
+        }
+        return null;
+    }
+
+    /**
+     * Unlock a key
+     * 
+     * @param key
+     * @throws Exception
+     */
+    public void unlock(String key) throws Exception {
+        StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
+        stateKey.setLocked(false);
+        stateKey.setRemoveOnExit(isRemoveOwnedObjectsOnExit());
+        stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
+        stateKey.setTimeToLive(getTimeToLive());
+        StateData stateData = new StateData();
+        stateData.setKeyData(stateKey.getKeyData());
+        stateData.setLockWrite(true);
+        sendMasterRequest(stateData);
+    }
+
+    /**
+     * Lock a Key
+     * 
+     * @param key
+     * @throws Exception
+     */
+    public void lock(String key) throws Exception {
+        lock(key, getLockTimeToLive());
+    }
+
+    /**
+     * Lock a Key
+     * 
+     * @param key
+     * @param leaseTime
+     * @throws Exception
+     */
+    public void lock(String key, long leaseTime) throws Exception {
+        StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
+        stateKey.setLocked(true);
+        stateKey.setRemoveOnExit(isRemoveOwnedObjectsOnExit());
+        stateKey.setReleaseLockOnExit(isReleaseLockOnExit());
+        stateKey.setTimeToLive(getTimeToLive());
+        stateKey.setLockLeaseTime(leaseTime);
+        StateData stateData = new StateData();
+        stateData.setKeyData(stateKey.getKeyData());
+        stateData.setLockWrite(true);
+        sendMasterRequest(stateData);
+    }
+
     /**
      * 
      * @see java.util.Map#clear()
      */
     public void clear() {
-        // TODO Auto-generated method stub
-        
+        checkStatus();
+        if (this.localMap != null && !this.localMap.isEmpty()) {
+            Set<String> keys = null;
+            keys = new HashSet<String>(this.localMap.keySet());
+            for (String key : keys) {
+                remove(key);
+            }
+        }
+        this.localMap.clear();
     }
+
     /**
      * @param key
      * @return
      * @see java.util.Map#containsKey(java.lang.Object)
      */
-    public boolean containsKey(Object key) {
-        // TODO Auto-generated method stub
-        return false;
+    public boolean containsKey(java.lang.Object key) {
+        return this.localMap.containsKey(key);
     }
+
     /**
      * @param value
      * @return
      * @see java.util.Map#containsValue(java.lang.Object)
      */
-    public boolean containsValue(Object value) {
-        // TODO Auto-generated method stub
-        return false;
+    public boolean containsValue(java.lang.Object value) {
+        StateValue sv = new StateValue(null, value, null);
+        return this.localMap.containsValue(sv);
     }
+
     /**
      * @return
      * @see java.util.Map#entrySet()
      */
-    public Set<java.util.Map.Entry<String, V>> entrySet() {
-        // TODO Auto-generated method stub
-        return null;
+    public Set<java.util.Map.Entry<String, Object>> entrySet() {
+        Map<String, Object> result = new HashMap<String, Object>();
+        try {
+            for (StateValue entry : this.localMap.values()) {
+                String key = entry.getKey().getKey();
+                Object value = entry.getValue();
+                result.put(key, value);
+            }
+        } catch (Exception e) {
+            throw new BlazeRuntimeException(e);
+        }
+        return result.entrySet();
     }
+
     /**
      * @param key
      * @return
      * @see java.util.Map#get(java.lang.Object)
      */
-    public V get(Object key) {
-        // TODO Auto-generated method stub
-        return null;
+    public Object get(java.lang.Object key) {
+        StateValue v = this.localMap.get(key);
+        try {
+            return v != null ? v.getValue() : null;
+        } catch (Exception e) {
+            throw new BlazeRuntimeException(e);
+        }
     }
+
     /**
      * @return
      * @see java.util.Map#isEmpty()
      */
     public boolean isEmpty() {
-        // TODO Auto-generated method stub
-        return false;
+        return this.localMap.isEmpty();
     }
+
     /**
      * @return
      * @see java.util.Map#keySet()
      */
     public Set<String> keySet() {
-        // TODO Auto-generated method stub
-        return null;
+        return this.localMap.keySet();
     }
+
     /**
      * @param key
      * @param value
      * @return
      * @see java.util.Map#put(java.lang.Object, java.lang.Object)
      */
-    public V put(String key, V value) {
-        // TODO Auto-generated method stub
-        return null;
+    public Object put(String key, Object value) {
+        return put(key, value, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(),
+                getLockTimeToLive());
+    }
+
+    /**
+     * @param key
+     * @param value
+     * @param lock
+     * @param removeOnExit
+     * @param releaseLockOnExit
+     * @param timeToLive
+     * @param leaseTime
+     * @return
+     * @throws BlazeRuntimeException
+     */
+    public Object put(String key, Object value, boolean lock, boolean removeOnExit, boolean releaseLockOnExit,
+            long timeToLive, long leaseTime) throws BlazeRuntimeException {
+        checkStatus();
+        Object resultValue = null;
+        try {
+            StateKey stateKey = new StateKey(this.channel.getLocalMember(), key);
+            stateKey.setLocked(lock);
+            stateKey.setRemoveOnExit(removeOnExit);
+            stateKey.setReleaseLockOnExit(releaseLockOnExit);
+            stateKey.setTimeToLive(timeToLive);
+            stateKey.setLockLeaseTime(leaseTime);
+            StateData stateData = new StateData();
+            stateData.setKeyData(stateKey.getKeyData());
+            stateData.setStateType(StateType.INSERT);
+            stateData.setMapWrite(true);
+            stateData.setValue(IOUtils.getBuffer(value));
+            resultValue = sendMasterRequest(stateData);
+        } catch (Exception e) {
+            if (e instanceof ClusterUpdateException) {
+                throw (ClusterUpdateException) e;
+            }
+            throw new ClusterUpdateException(e);
+        }
+        return resultValue;
     }
+
     /**
      * @param t
      * @see java.util.Map#putAll(java.util.Map)
      */
-    public void putAll(Map<? extends String, ? extends V> t) {
-        // TODO Auto-generated method stub
-        
+    public void putAll(Map<? extends String, ? extends Object> t) {
+        putAll(t, isAlwaysLock(), isRemoveOwnedObjectsOnExit(), isReleaseLockOnExit(), getTimeToLive(),
+                getLockTimeToLive());
     }
+
+    /**
+     * put all
+     * 
+     * @param t
+     * @param lock
+     * @param removeOnExit
+     * @param releaseLockOnExit
+     * @param timeToLive
+     * @param lockTimeToLive
+     */
+    public void putAll(Map<? extends String, ? extends Object> t, boolean lock, boolean removeOnExit,
+            boolean releaseLockOnExit, long timeToLive, long lockTimeToLive) {
+        for (java.util.Map.Entry<? extends String, ? extends Object> entry : t.entrySet()) {
+            put(entry.getKey(), entry.getValue(), lock, removeOnExit, releaseLockOnExit, timeToLive, lockTimeToLive);
+        }
+    }
+
     /**
      * @param key
      * @return
      * @see java.util.Map#remove(java.lang.Object)
      */
-    public V remove(Object key) {
-        // TODO Auto-generated method stub
-        return null;
+    public Object remove(java.lang.Object key) {
+        checkStatus();
+        StateKey stateKey = new StateKey(this.channel.getLocalMember(), key.toString());
+        StateData stateData = new StateData();
+        stateData.setKeyData(stateKey.getKeyData());
+        stateData.setMapWrite(true);
+        stateData.setStateType(StateType.DELETE);
+        try {
+            return this.channel.sendRequest((MemberImpl) this.channel.getMaster(), stateData.type(), stateData,
+                    getRequestTimeout());
+        } catch (Exception e) {
+            throw new BlazeRuntimeException(e);
+        }
     }
+
     /**
      * @return
      * @see java.util.Map#size()
      */
     public int size() {
-        // TODO Auto-generated method stub
-        return 0;
+        return this.localMap.size();
     }
+
     /**
      * @return
      * @see java.util.Map#values()
      */
-    public Collection<V> values() {
-        // TODO Auto-generated method stub
-        return null;
+    public Collection<Object> values() {
+        List<Object> result = new ArrayList<Object>();
+        try {
+            for (StateValue v : this.localMap.values()) {
+                result.add(v.getValue());
+            }
+        } catch (Exception e) {
+            throw new BlazeRuntimeException(e);
+        }
+        return result;
+    }
+
+    /**
+     * @return the alwaysLock
+     */
+    public boolean isAlwaysLock() {
+        return this.alwaysLock;
+    }
+
+    /**
+     * @param alwaysLock
+     *            the alwaysLock to set
+     */
+    public void setAlwaysLock(boolean alwaysLock) {
+        this.alwaysLock = alwaysLock;
+    }
+
+    /**
+     * @return the removeOwnedObjectsOnExit
+     */
+    public boolean isRemoveOwnedObjectsOnExit() {
+        return this.removeOwnedObjectsOnExit;
+    }
+
+    /**
+     * @param removeOwnedObjectsOnExit
+     *            the removeOwnedObjectsOnExit to set
+     */
+    public void setRemoveOwnedObjectsOnExit(boolean removeOwnedObjectsOnExit) {
+        this.removeOwnedObjectsOnExit = removeOwnedObjectsOnExit;
+    }
+
+    /**
+     * @return the releaseLockOnExit
+     */
+    public boolean isReleaseLockOnExit() {
+        return this.releaseLockOnExit;
+    }
+
+    /**
+     * @param releaseLockOnExit
+     *            the releaseLockOnExit to set
+     */
+    public void setReleaseLockOnExit(boolean releaseLockOnExit) {
+        this.releaseLockOnExit = releaseLockOnExit;
+    }
+
+    /**
+     * @return the timeToLive
+     */
+    public int getTimeToLive() {
+        return this.timeToLive;
+    }
+
+    /**
+     * @param timeToLive
+     *            the timeToLive to set
+     */
+    public void setTimeToLive(int timeToLive) {
+        this.timeToLive = timeToLive;
+    }
+
+    /**
+     * @return the lockTimeToLive
+     */
+    public int getLockTimeToLive() {
+        return this.lockTimeToLive;
+    }
+
+    /**
+     * @param lockTimeToLive
+     *            the lockTimeToLive to set
+     */
+    public void setLockTimeToLive(int lockTimeToLive) {
+        this.lockTimeToLive = lockTimeToLive;
+    }
+
+    /**
+     * @return the requestTimeout
+     */
+    public int getRequestTimeout() {
+        return this.requestTimeout;
+    }
+
+    /**
+     * @param requestTimeout
+     *            the requestTimeout to set
+     */
+    public void setRequestTimeout(int requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            this.channel.addMemberChangedListener(this);
+        }
+        return result;
+    }
+
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+            this.channel.removeMemberChangedListener(this);
+        }
+        return result;
+    }
+
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result) {
+            this.stateChangedExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable, "Cluster State{" + ClusterState.this.channel.getLocalMember()
+                            + "}");
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            this.expirationService = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+                public Thread newThread(Runnable r) {
+                    Thread thread = new Thread(r);
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            Runnable expiration = new Runnable() {
+                public void run() {
+                    try {
+                        expirationSweep();
+                    } catch (Exception e) {
+                        ClusterState.LOG.error("Failed to send heartbeat", e);
+                    }
+                }
+            };
+            this.expirationService.scheduleAtFixedRate(expiration, 500, 500, TimeUnit.MILLISECONDS);
+        }
+        return result;
+    }
+
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (result) {
+            this.stateChangedExecutor.shutdown();
+            this.expirationService.shutdown();
+        }
+        return result;
+    }
+
+    /**
+     * Add a <Code>ClusterStateChangedListener</Code>
+     * 
+     * @param l
+     */
+    public void addClusterStateChangedListener(ClusterStateChangedListener l) {
+        this.clusterStateChangedListeners.add(l);
+    }
+
+    /**
+     * Add a <Code>ClusterStateChangedListener</Code>
+     * 
+     * @param l
+     */
+    public void removeClusterStateChangedListener(ClusterStateChangedListener l) {
+        this.clusterStateChangedListeners.remove(l);
+    }
+
+    /**
+     * Implementation of org.apache.activeblaze.group.MemberChangedListener for listening to membership changes
+     * 
+     * @param member
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
+     */
+    public void memberStarted(Member member) {
+        try {
+            if (this.channel.isMaster()) {
+                this.channel.waitForElection(0);
+                // even though we may no longer be the master - we
+                // was the master before the new node started - so
+                // we take responsibility for updating the new node
+                for (StateValue value : this.localMap.values()) {
+                    StateData newStateData = value.getData().clone();
+                    newStateData.setMapWrite(false);
+                    newStateData.setMapUpdate(true);
+                    newStateData.setStateType(StateType.SYNC);
+                    broadcastStateUpdate(newStateData, "");
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to sync up new group member: " + member);
+        }
+    }
+
+    /**
+     * Implementation of org.apache.activeblaze.group.MemberChangedListener for listening to membership changes
+     * 
+     * @param member
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
+     */
+    public void memberStopped(Member member) {
+        if (isStarted()) {
+            // remove any values held by this member
+            List<StateKey> list = new ArrayList<StateKey>();
+            for (StateValue stateValue : this.localMap.values()) {
+                StateKey key = stateValue.getKey();
+                if (key.getOwner().equals(member)) {
+                    if (key.isReleaseLockOnExit()) {
+                        key.setLocked(false);
+                    }
+                    if (key.isRemoveOnExit()) {
+                        list.add(key);
+                    }
+                }
+            }
+            for (StateKey key : list) {
+                StateValue value = this.localMap.remove(key.getKey());
+                if (value != null) {
+                    try {
+                        fireMapChanged(member, key.getKey(), value.getValue(), null, false);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to fireMapChanged for key " + key.getKey(), e);
+                    }
+                }
+            }
+        }
+    }
+
+    protected void processStateData(PacketData data) throws Exception {
+        MessageType type = MessageType.STATE_DATA;
+        StateData stateData = (StateData) type.createMessage();
+        Buffer payload = data.getPayload();
+        stateData.mergeFramed(payload);
+        String correlationId = "";
+        if (data.hasMessageId()) {
+            correlationId = data.getMessageId().toStringUtf8();
+        }
+        if (!stateData.getError()) {
+            if (stateData.getMapUpdate()) {
+                processMapUpdate(stateData);
+            } else if (stateData.getLockUpdate()) {
+                processLockUpdate(stateData, correlationId);
+            } else if (stateData.getLockWrite()) {
+                processLockWrite(stateData, correlationId);
+            } else if (stateData.getMapWrite()) {
+                processMapOperations(stateData, correlationId);
+            } else {
+                LOG.error("Don't know how to process " + stateData);
+            }
+        }
+    }
+
+    protected void processLockWrite(StateData stateData, String correlationId) throws Exception {
+        if (this.channel.waitForElection(0)) {
+            // reset values for when we broadcast an update
+            stateData.setLockUpdate(true);
+            stateData.setLockWrite(false);
+            StateValue stateValue = new StateValue(stateData);
+            boolean newLock = stateValue.getKey().isLocked();
+            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember());
+            long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
+            if (this.channel.isMaster()) {
+                StateKey originalKey = getKey(stateValue.getKey().getKey());
+                if (originalKey != null) {
+                    if (originalKey.isLocked()) {
+                        if (!originalKey.getOwner().equals(stateValue.getKey().getOwner())) {
+                            StateValue stateReply = stateValue.copy();
+                            Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
+                            stateReply.getData().setError(true);
+                            stateReply.getData().setValue(IOUtils.getBuffer(reply));
+                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData()
+                                    .type(), stateReply.getData(), correlationId);
+                        } else {
+                            originalKey.setLocked(newLock);
+                            originalKey.setOwner(newOwner);
+                            originalKey.setLockExpiration(newLockExpiration);
+                            broadcastStateUpdate(stateData, correlationId);
+                        }
+                    } else {
+                        originalKey.setLocked(newLock);
+                        originalKey.setOwner(newOwner);
+                        originalKey.setLockExpiration(newLockExpiration);
+                        broadcastStateUpdate(stateData, correlationId);
+                    }
+                }
+            } else {
+                StateValue stateReply = stateValue.copy();
+                stateReply.getData().clearStateType();
+                Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
+                stateReply.getData().setError(true);
+                stateReply.getData().setValue(IOUtils.getBuffer(reply));
+                this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData().type(),
+                        stateReply.getData(), correlationId);
+            }
+        }
+    }
+
+    protected void processLockUpdate(StateData stateData, String correlationId) throws Exception {
+        if (this.channel.waitForElection(0)) {
+            StateValue stateValue = new StateValue(stateData);
+            boolean newLock = stateValue.getKey().isLocked();
+            MemberImpl newOwner = new MemberImpl(stateData.getKeyData().getMember());
+            long newLockExpiration = newLock ? stateValue.getKey().getLockExpiration() : 0l;
+            if (!this.channel.isMaster()) {
+                StateKey originalKey = getKey(stateValue.getKey().getKey());
+                if (originalKey != null) {
+                    originalKey.setLocked(newLock);
+                    originalKey.setOwner(newOwner);
+                    originalKey.setLockExpiration(newLockExpiration);
+                }
+            }
+        }
+    }
+
+    protected void processMapOperations(StateData data, String correlationId) throws Exception {
+        StateValue stateValue = new StateValue(data);
+        StateKey key = stateValue.getKey();
+        StateType stateType = stateValue.getData().getStateType();
+        if (stateType != null) {
+            boolean insert = stateType.equals(StateType.INSERT) || stateType.equals(StateType.SYNC);
+            boolean containsKey = this.localMap.containsKey(key.getKey());
+            if (this.channel.waitForElection(0)) {
+                if (this.channel.isMaster()) {
+                    if (containsKey) {
+                        StateKey originalKey = getKey(key.getKey());
+                        if (originalKey.getOwner().equals(key.getOwner()) || !originalKey.isLocked()) {
+                            StateValue old = null;
+                            if (insert) {
+                                old = this.localMap.put(key.getKey(), stateValue);
+                            } else {
+                                old = this.localMap.remove(key.getKey());
+                            }
+                            StateData newStateData = data.clone();
+                            newStateData.clearOldvalue();
+                            if (old != null && old.getData().getValue() != null) {
+                                newStateData.setOldvalue(old.getData().getValue());
+                            }
+                            newStateData.setMapWrite(false);
+                            newStateData.setMapUpdate(true);
+                            broadcastStateUpdate(newStateData, correlationId);
+                            fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), false);
+                        } else {
+                            StateValue stateReply = stateValue.copy();
+                            Serializable reply = new ClusterUpdateException("Owned by " + originalKey.getOwner());
+                            stateReply.getData().setValue(IOUtils.getBuffer(reply));
+                            stateReply.getData().setError(true);
+                            this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData()
+                                    .type(), stateReply.getData(), correlationId);
+                        }
+                    } else {
+                        if (insert) {
+                            this.localMap.put(key.getKey(), stateValue);
+                            StateData newStateData = data.clone();
+                            newStateData.setMapWrite(false);
+                            newStateData.setMapUpdate(true);
+                            broadcastStateUpdate(newStateData, correlationId);
+                            fireMapChanged(key.getOwner(), key.getKey(), null, stateValue.getValue(), false);
+                        } else {
+                            // this shouldn't happen - as we are trying to remove
+                            // a non-existent key
+                            LOG.warn("Cluster State in inconsistent state - master not aware of " + key.getKey()
+                                    + " from " + key.getOwner());
+                        }
+                    }
+                } else {
+                    StateValue stateReply = stateValue.copy();
+                    stateReply.getData().clearStateType();
+                    Serializable reply = new ClusterNotMasterException(this.channel.getLocalMember() + " Not Master");
+                    stateReply.getData().setError(true);
+                    stateReply.getData().setValue(IOUtils.getBuffer(reply));
+                    this.channel.sendReply((MemberImpl) stateReply.getKey().getOwner(), stateReply.getData().type(),
+                            stateReply.getData(), correlationId);
+                }
+            }
+        }
+    }
+
+    protected void processMapUpdate(StateData data) throws Exception {
+        StateKeyData skd = data.getKeyData();
+        StateKey key = new StateKey(skd);
+        StateValue stateValue = new StateValue(data);
+        boolean containsKey = this.localMap.containsKey(key.getKey());
+        if (this.channel.waitForElection(0)) {
+            boolean insert = data.getStateType().equals(StateType.SYNC) || data.getStateType().equals(StateType.INSERT);
+            if (!this.channel.isMaster() || data.getStateType().equals(StateType.SYNC)) {
+                if (containsKey) {
+                    if (key.isLockExpired()) {
+                        StateValue old = this.localMap.get(key.getKey());
+                        if (old != null) {
+                            old.getKey().setLocked(false);
+                        }
+                    } else {
+                        StateValue old = null;
+                        if (insert) {
+                            old = this.localMap.put(key.getKey(), stateValue);
+                        } else {
+                            old = this.localMap.remove(key.getKey());
+                            StateData copy = data.clone();
+                            copy.clearValue();
+                            copy.clearOldvalue();
+                            stateValue = new StateValue(copy);
+                        }
+                        fireMapChanged(key.getOwner(), key.getKey(), old.getValue(), stateValue.getValue(), data
+                                .getExpired());
+                    }
+                } else {
+                    if (insert) {
+                        this.localMap.put(key.getKey(), stateValue);
+                        fireMapChanged(key.getOwner(), key.getKey(), null, stateValue.getValue(), false);
+                    }
+                }
+            }
+        }
+    }
+
+    protected void checkStatus() throws RuntimeException {
+        if (!isStarted()) {
+            throw new IllegalStateException("ClusterState " + this.channel.getName() + " not started");
+        }
+        try {
+            this.channel.waitForElection(0);
+        } catch (Exception e) {
+            if (e instanceof RuntimeException) {
+                throw (RuntimeException) e;
+            }
+            throw new BlazeRuntimeException(e);
+        }
+    }
+
+    protected void expirationSweep() throws Exception {
+        if (this.channel.waitForElection(0)) {
+            if (this.channel.isMaster() && isStarted()) {
+                List<String> expiredMessages = null;
+                List<StateValue> expiredLocks = null;
+                long currentTime = System.currentTimeMillis();
+                for (StateValue value : this.localMap.values()) {
+                    StateKey key = value.getKey();
+                    if (key.isExpired(currentTime)) {
+                        if (expiredMessages == null) {
+                            expiredMessages = new ArrayList<String>();
+                        }
+                        expiredMessages.add(key.getKey());
+                    } else if (key.isLockExpired(currentTime)) {
+                        key.setLocked(false);
+                        if (expiredLocks == null) {
+                            expiredLocks = new ArrayList<StateValue>();
+                        }
+                        expiredLocks.add(value);
+                    }
+                }
+                // do the actual removal of entries in a separate thread
+                if (expiredMessages != null) {
+                    final List<String> expire = expiredMessages;
+                    this.stateChangedExecutor.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                doMessageExpiration(expire);
+                            } catch (Exception e) {
+                                LOG.error("Message expiration failed", e);
+                            }
+                        }
+                    });
+                }
+                if (expiredLocks != null) {
+                    final List<StateValue> expire = expiredLocks;
+                    this.stateChangedExecutor.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                doLockExpiration(expire);
+                            } catch (Exception e) {
+                                LOG.error("Lock expiration failed", e);
+                            }
+                        }
+                    });
+                }
+            }
+        }
+    }
+
+    protected void doMessageExpiration(List<String> list) throws Exception {
+        if (isStarted() && this.channel.isElectionFinished() && this.channel.isMaster()) {
+            for (String k : list) {
+                StateValue old = this.localMap.remove(k);
+                if (old != null) {
+                    StateValue value = old.copy();
+                    value.getData().setStateType(StateType.DELETE);
+                    value.getData().setExpired(true);
+                    value.getData().clearMapWrite();
+                    value.getData().setMapUpdate(true);
+                    broadcastStateUpdate(value.getData(), "");
+                    fireMapChanged(new MemberImpl(value.getData().getKeyData().getMember()), k, old.getValue(), null,
+                            true);
+                }
+            }
+        }
+    }
+
+    protected StateKey getKey(String key) {
+        StateValue stateValue = this.localMap != null ? this.localMap.get(key) : null;
+        return stateValue != null ? stateValue.getKey() : null;
+    }
+
+    protected void doLockExpiration(List<StateValue> list) throws Exception {
+        if (isStarted() && this.channel.isElectionFinished() && this.channel.isMaster()) {
+            for (StateValue value : list) {
+                StateValue copy = value.copy();
+                copy.getData().setStateType(StateType.DELETE);
+                copy.getData().setLockExpired(true);
+                broadcastStateUpdate(copy.getData(), "");
+            }
+        }
+    }
+
+    protected void fireMapChanged(final Member owner, final String key, final Object oldValue, final Object newValue,
+            final boolean expired) {
+        if (isStarted() && this.stateChangedExecutor != null && !this.stateChangedExecutor.isShutdown()) {
+            this.stateChangedExecutor.execute(new Runnable() {
+                public void run() {
+                    doFireMapChanged(owner, key, oldValue, newValue, expired);
+                }
+            });
+        }
+    }
+
+    protected void doFireMapChanged(Member owner, String key, Object oldValue, Object newValue, boolean expired) {
+        if (isStarted()) {
+            for (ClusterStateChangedListener l : this.clusterStateChangedListeners) {
+                if (oldValue == null) {
+                    l.mapInsert(owner, key, newValue);
+                } else if (newValue == null) {
+                    l.mapRemove(owner, key, oldValue, expired);
+                } else {
+                    l.mapUpdate(owner, key, oldValue, newValue);
+                }
+            }
+        }
+    }
+
+    protected void broadcastStateUpdate(StateData value, String correlationId) {
+        if (isStarted()) {
+            try {
+                this.channel.broadcastMessage(value.type(), value, correlationId);
+            } catch (Exception e) {
+                if (isStarted()) {
+                    LOG.error("Failed to send StateData " + value, e);
+                }
+            }
+        }
+    }
+
+    protected Object sendMasterRequest(StateData stateData) throws Exception {
+        int retryCount = 0;
+        MemberImpl master = null;
+        while (retryCount < 5) {
+            this.channel.waitForElection(0);
+            master = (MemberImpl) this.channel.getMaster();
+            StateData resultData = (StateData) this.channel.sendRequest(master, stateData.type(), stateData,
+                    getRequestTimeout());
+            retryCount++;
+            if (resultData != null) {
+                Object resultValue = IOUtils.getObject(resultData.getValue());
+                if (resultValue instanceof ClusterNotMasterException) {
+                    LOG.warn(this.getLocal().getName() + " Request sent to an old master " + master
+                            + "  - resending to new master: " + this.channel.getMaster());
+                    Thread.sleep(1000);
+                    continue;
+                }
+                if (resultValue instanceof ClusterUpdateException) {
+                    throw (ClusterUpdateException) resultValue;
+                }
+                Object result = IOUtils.getObject(resultData.getOldvalue());
+                return result;
+            }
+        }
+        throw new ClusterUpdateException(getLocal() + " Request timed out to " + master);
+    }
+
+    protected MemberImpl getLocal() {
+        return this.channel.getLocalMember();
     }
 }
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.group.Member;
+
+
+
+/**
+ *Get notifications about changes to the state of the map
+ *
+ */
+public interface ClusterStateChangedListener {
+    
+    /**
+     * Called when a key/value pair is inserted into the map
+     * @param owner 
+     * @param key
+     * @param value 
+     */
+    void mapInsert(Member owner,String key, Object value);
+    
+    /**
+     * Called when a key value is updated in the map
+     * @param owner
+     * @param Key
+     * @param oldValue
+     * @param newValue
+     */
+    void mapUpdate(Member owner,String Key,Object oldValue,Object newValue);
+    
+    /**
+     * Called when a key value is removed from the map
+     * @param owner
+     * @param key
+     * @param value
+     * @param expired
+     */
+    void mapRemove(Member owner,String key, Object value,boolean expired);
+}
\ No newline at end of file

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterStateChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.BlazeRuntimeException;
+
+/**
+ * Exception raised when updating Cluster State
+ *
+ */
+public class ClusterUpdateException extends BlazeRuntimeException{
+   
+    private static final long serialVersionUID = 8778617962968095062L;
+
+    /**
+     * Constructs a new exception with <code>null</code> as its detail message.
+     * The cause is not initialized, and may subsequently be initialized by a
+     * call to {@link #initCause}.
+     */
+    public ClusterUpdateException() {
+    super();
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message.  The
+     * cause is not initialized, and may subsequently be initialized by
+     * a call to {@link #initCause}.
+     *
+     * @param   message   the detail message. The detail message is saved for 
+     *          later retrieval by the {@link #getMessage()} method.
+     */
+    public ClusterUpdateException(String message) {
+    super(message);
+    }
+
+    /**
+     * Constructs a new exception with the specified detail message and
+     * cause.  <p>Note that the detail message associated with
+     * <code>cause</code> is <i>not</i> automatically incorporated in
+     * this exception's detail message.
+     *
+     * @param  message the detail message (which is saved for later retrieval
+     *         by the {@link #getMessage()} method).
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     */
+    public ClusterUpdateException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Constructs a new exception with the specified cause and a detail
+     * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+     * typically contains the class and detail message of <tt>cause</tt>).
+     * This constructor is useful for exceptions that are little more than
+     * wrappers for other throwables (for example, {@link
+     * java.security.PrivilegedActionException}).
+     *
+     * @param  cause the cause (which is saved for later retrieval by the
+     *         {@link #getCause()} method).  (A <tt>null</tt> value is
+     *         permitted, and indicates that the cause is nonexistent or
+     *         unknown.)
+     */
+    public ClusterUpdateException(Throwable cause) {
+        super(cause);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterUpdateException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java?rev=722095&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java Mon Dec  1 07:32:04 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.group.Member;
+
+/**
+ * Default implementation of ClusterStateListener
+ * 
+ */
+public class DefaultClusterStateListener implements ClusterStateChangedListener {
+    /**
+     * @param owner
+     * @param key
+     * @param value
+     * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapInsert(org.apache.activeblaze.group.Member,
+     *      java.lang.String, java.lang.Object)
+     */
+    public void mapInsert(Member owner, String key, Object value) {
+    }
+
+    /**
+     * @param owner
+     * @param key
+     * @param value
+     * @param expired
+     * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapRemove(org.apache.activeblaze.group.Member,
+     *      java.lang.String, java.lang.Object, boolean)
+     */
+    public void mapRemove(Member owner, String key, Object value, boolean expired) {
+    }
+
+    /**
+     * @param owner
+     * @param Key
+     * @param oldValue
+     * @param newValue
+     * @see org.apache.activeblaze.cluster.ClusterStateChangedListener#mapUpdate(org.apache.activeblaze.group.Member,
+     *      java.lang.String, java.lang.Object, java.lang.Object)
+     */
+    public void mapUpdate(Member owner, String Key, Object oldValue, Object newValue) {
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/DefaultClusterStateListener.java
------------------------------------------------------------------------------
    svn:eol-style = native