You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC

svn commit: r719706 [3/6] - in /activemq/activemq-blaze: ./ branches/ tags/ trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/ trunk/src/main/java/org/ trunk/src/main/java/org/apache/ trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.net.URI;
+import org.apache.activeblaze.group.BlazeGroupChannelImpl;
+import org.apache.activeblaze.group.Group;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.wire.ElectionMessage;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication and maintains a coordinator
+ * (elected leader) for the group
+ * 
+ */
+public class BlazeCoordinatedGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeCoordinatedGroupChannel {
+    private static final Log LOG = LogFactory.getLog(BlazeCoordinatedGroupChannelImpl.class);
+    private CoordinatedGroup coordinatedGroup;
+
+    /**
+     * Constructor
+     * 
+     * @param name
+     */
+    public BlazeCoordinatedGroupChannelImpl(String name) {
+        super(name);
+    }
+
+    /**
+     * @param l
+     * @throws Exception
+     * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#addCoordinatorChangedListener(org.apache.activeblaze.coordinated.CoordinatorChangedListener)
+     */
+    public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception {
+        init();
+        this.coordinatedGroup.addCoordinatorChangedListener(l);
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#getCoordinator()
+     */
+    public Member getCoordinator() throws Exception {
+        init();
+        return this.coordinatedGroup.getCoordinator();
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#isCoordinator()
+     */
+    public boolean isCoordinator() throws Exception {
+        init();
+        return this.coordinatedGroup.isCoordinatorMatch();
+    }
+
+    /**
+     * @param l
+     * @throws Exception
+     * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#removeMemberChangedListener(org.apache.activeblaze.coordinated.CoordinatorChangedListener)
+     */
+    public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception {
+        init();
+        this.coordinatedGroup.removeCoordinatorChangedListener(l);
+    }
+
+    /**
+     * @return
+     * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#getCoordinatedGroupConfiguration()
+     */
+    public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration() {
+        return (BlazeCoordinatedGroupConfiguration) getGroupConfiguration();
+    }
+    
+    /**
+     * @param timeout
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#waitForElection(int)
+     */
+    public boolean waitForElection(int timeout) throws Exception {
+        init();
+        return this.coordinatedGroup.waitForElection(timeout);
+    }
+
+    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+        if (isStarted()) {
+            MessageType type = MessageType.valueOf(data.getType());
+            if (type == MessageType.BLAZE_DATA) {
+                doProcessBlazeData(data);
+            } else if (type == MessageType.MEMBER_DATA) {
+                doProcessMemberData(data);
+            } else if (type.equals(MessageType.ELECTION_MESSAGE)) {
+                doProcessElectionData(id, data);
+            } else {
+                LOG.error("Unknown message type " + data);
+            }
+        }
+    }
+
+    protected MemberImpl createLocal(URI uri) throws Exception {
+        return new MemberImpl(getId(), getName(), getCoordinatedGroupConfiguration().getCoordinatorWeight(), uri);
+    }
+
+    protected Group createGroup() {
+        this.coordinatedGroup = new CoordinatedGroup(this);
+        return this.coordinatedGroup;
+    }
+
+    protected void doProcessElectionData(String id, PacketData data) throws Exception {
+        MessageType type = MessageType.ELECTION_MESSAGE;
+        ElectionMessage electionMessage = (ElectionMessage) type.createMessage();
+        Buffer payload = data.getPayload();
+        electionMessage.mergeFramed(payload);
+        CoordinatedGroup group = (CoordinatedGroup) getGroup();
+        group.processElectionMessage(electionMessage, id);
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activeblaze.group.BlazeGroupConfiguration;
+
+/**
+ * Configuration for a BlazeCoordinatedGroupChannel
+ *
+ */
+public class BlazeCoordinatedGroupConfiguration extends BlazeGroupConfiguration{
+    private long coordinatorWeight = 0;
+    private int minimumGroupSize = 1;
+    private long  awaitGroupTimeout = getHeartBeatInterval()*2;
+    
+    
+    /**
+     * @return the coordinatorWeight
+     */
+    public long getCoordinatorWeight() {
+        return this.coordinatorWeight;
+    }
+
+    /**
+     * @param coordinatorWeight the coordinatorWeight to set
+     */
+    public void setCoordinatorWeight(long coordinatorWeight) {
+        this.coordinatorWeight = coordinatorWeight;
+    }
+
+    /**
+     * @return the minimumGroupSize
+     */
+    public int getMinimumGroupSize() {
+        return this.minimumGroupSize;
+    }
+
+    /**
+     * @param minimumGroupSize the minimumGroupSize to set
+     */
+    public void setMinimumGroupSize(int minimumGroupSize) {
+        this.minimumGroupSize = minimumGroupSize;
+    }
+
+    /**
+     * @return the awaitGroupTimeout
+     */
+    public long getAwaitGroupTimeout() {
+        return this.awaitGroupTimeout;
+    }
+
+    /**
+     * @param awaitGroupTimeout the awaitGroupTimeout to set
+     */
+    public void setAwaitGroupTimeout(long awaitGroupTimeout) {
+        this.awaitGroupTimeout = awaitGroupTimeout;
+    }
+    
+    protected BlazeConfiguration newInstance() {
+        return new BlazeCoordinatedGroupConfiguration();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.group.AsyncGroupRequest;
+import org.apache.activeblaze.group.Group;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.wire.ElectionMessage;
+import org.apache.activeblaze.wire.ElectionType;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Implementation of Group State
+ * 
+ */
+public class CoordinatedGroup extends Group {
+    static final Log LOG = LogFactory.getLog(CoordinatedGroup.class);
+    final BlazeCoordinatedGroupChannelImpl channel;
+    private final BlazeCoordinatedGroupConfiguration configuration;
+    private ThreadPoolExecutor electionExecutor;
+    private MemberImpl coordinator;
+    private List<CoordinatorChangedListener> listeners = new CopyOnWriteArrayList<CoordinatorChangedListener>();
+    final AtomicBoolean electionFinished = new AtomicBoolean(false);
+    private long startTime;
+
+    /**
+     * Constructor
+     * 
+     * @param local
+     * @param channel
+     * @param transport
+     * @param config
+     */
+    protected CoordinatedGroup(BlazeCoordinatedGroupChannelImpl channel) {
+        super(channel);
+        this.channel = channel;
+        this.coordinator = this.channel.getLocalMember();
+        this.configuration = channel.getCoordinatedGroupConfiguration();
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#start()
+     */
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result) {
+            this.startTime = System.currentTimeMillis();
+            this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                    new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+                        public Thread newThread(Runnable runnable) {
+                            Thread thread = new Thread(runnable, "Election{" + CoordinatedGroup.this.channel.getId()
+                                    + "}");
+                            thread.setDaemon(true);
+                            return thread;
+                        }
+                    });
+        }
+        return result;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#stop()
+     */
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (result) {
+            if (this.electionExecutor != null) {
+                this.electionExecutor.shutdownNow();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @return true if there is elections have finished
+     */
+    public boolean isElectionFinished() {
+        return this.electionFinished.get();
+    }
+
+    void setElectionFinished(boolean flag) {
+        synchronized (this.electionFinished) {
+            this.electionFinished.set(flag);
+            // this.electionFinished.notifyAll();
+        }
+    }
+
+    /**
+     * Process a new member
+     * 
+     * @param data
+     * @throws Exception
+     */
+    protected MemberImpl processMember(MemberData data) throws Exception {
+        MemberImpl result = super.processMember(data);
+        if (result != null || (!isElectionFinished() && !data.getId().equals(getLocalMember().getId()))) {
+            election(result, true);
+        }
+        return result;
+    }
+
+    protected void processMemberStopped(MemberImpl member) throws Exception {
+        super.processMemberStopped(member);
+        election(null, false);
+    }
+
+    void election(final Member member, final boolean memberStarted) throws Exception {
+        if (isStarted() && this.electionExecutor != null && !this.electionExecutor.isShutdown()) {
+            synchronized (this.electionFinished) {
+                this.electionFinished.set(false);
+            }
+            if (this.members.size() >= getConfiguration().getMinimumGroupSize()
+                    || (getConfiguration().getAwaitGroupTimeout() + this.startTime) < System.currentTimeMillis())
+                synchronized (this.electionExecutor) {
+                    // remove any queued election tasks
+                    List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
+                    for (Runnable r : list) {
+                        ElectionService es = (ElectionService) r;
+                        es.stop();
+                        this.electionExecutor.remove(es);
+                    }
+                }
+            ElectionService es = new ElectionService(this, member, memberStarted);
+            es.start();
+            this.electionExecutor.execute(es);
+        }
+    }
+
+    /**
+     * @return true if the coordinator for the map
+     */
+    protected boolean isCoordinatorMatch() {
+        String coordinatorId = this.coordinator != null ? this.coordinator.getId() : "";
+        return this.channel.getId().equals(coordinatorId);
+    }
+
+    protected MemberImpl getCoordinator() {
+        return this.coordinator;
+    }
+
+    protected void setCoordinator(MemberImpl member) {
+        this.coordinator = member;
+    }
+
+    protected void addCoordinatorChangedListener(CoordinatorChangedListener l) {
+        this.listeners.add(l);
+    }
+
+    /**
+     * Remove a listener for membership changes
+     * 
+     * @param l
+     * @throws Exception
+     */
+    protected void removeCoordinatorChangedListener(CoordinatorChangedListener l) {
+        this.listeners.remove(l);
+    }
+
+    protected void fireCoordinatorChanged(MemberImpl newCoordinator) {
+        for (CoordinatorChangedListener l : this.listeners) {
+            l.coordinatorChanged(newCoordinator);
+        }
+    }
+
+    boolean callElection() throws Exception {
+        List<MemberImpl> members = new ArrayList<MemberImpl>(this.members.values());
+        List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(members);
+        AsyncGroupRequest request = new AsyncGroupRequest();
+        boolean doCall = false;
+        for (Member member : sorted) {
+            if (this.channel.getId().equals(member.getId())) {
+                doCall = true;
+            } else if (doCall) {
+                ElectionMessage msg = new ElectionMessage();
+                msg.setMember(this.channel.getLocalMember().getData());
+                msg.setElectionType(ElectionType.ELECTION);
+                this.channel.broadcastMessage(request, msg.type(), msg);
+            }
+        }
+        boolean result = request.isSuccess(this.configuration.getHeartBeatInterval());
+        return result;
+    }
+
+    void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception {
+        MemberImpl from = new MemberImpl(msg.getMember());
+        if (from != null && !from.getId().equals(getLocalMember().getId())) {
+            LOG.debug(getLocalMember()+" Election message "+ msg.getElectionType() + " from " + from);
+            if (msg.getElectionType().equals(ElectionType.ELECTION)) {
+                ElectionMessage reply = new ElectionMessage();
+                reply.setElectionType(ElectionType.ANSWER);
+                reply.setMember(this.channel.getLocalMember().getData());
+                this.channel.sendReply(from, msg.type(), reply, correlationId);
+                election(null, false);
+            } else if (msg.getElectionType().equals(ElectionType.COORDINATOR)) {
+                this.coordinator=from;
+                LOG.debug(getLocalMember()+" Coordinator is "+ from);
+                setElectionFinished(true);
+            }
+        }
+    }
+
+    void broadcastElectionType(ElectionType type) throws Exception {
+        if (isStarted()) {
+            ElectionMessage msg = new ElectionMessage();
+            msg.setMember(this.channel.getLocalMember().getData());
+            msg.setElectionType(type);
+            this.channel.broadcastMessage(MessageType.ELECTION_MESSAGE, msg);
+        }
+    }
+
+    boolean waitForElection(int timeout) throws Exception {
+        long deadline = 0;
+        if (timeout > 0) {
+            deadline = System.currentTimeMillis() + timeout;
+        }
+        synchronized (this.electionFinished) {
+            while (isStarted() && !this.electionFinished.get()) {
+                try {
+                    this.electionFinished.wait(timeout);
+                } catch (InterruptedException e) {
+                    LOG.warn("Interrupted in waitForElection");
+                    stop();
+                }
+                if (timeout > 0) {
+                    timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
+                }
+            }
+        }
+        return !isStopped() && this.electionFinished.get();
+    }
+
+    protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
+        Collections.sort(list, new Comparator<Member>() {
+            public int compare(Member m1, Member m2) {
+                long result = m1.getCoordinatorWeight() - m2.getCoordinatorWeight();
+                if (result == 0) {
+                    result = m1.getId().compareTo(m2.getId());
+                }
+                return (int) result;
+            }
+        });
+        return list;
+    }
+
+    /**
+     * @return the configuration
+     */
+    public BlazeCoordinatedGroupConfiguration getConfiguration() {
+        return this.configuration;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+
+/**
+ * A listener for coordinator changes to a group
+ *
+ */
+public interface CoordinatorChangedListener extends MemberChangedListener {
+    /**
+     * Fired when a coordinator changes in the group
+     * @param newCoordinator
+     */
+    void coordinatorChanged(Member newCoordinator);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.wire.ElectionType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Responsible for calling an election amongst the members and deciding a 
+ * coordinator
+ *
+ */
+class ElectionService extends BaseService implements Runnable {
+    private static final Log LOG = LogFactory.getLog(ElectionService.class);
+    private final CoordinatedGroup group;
+    private Member member;
+    ElectionService(CoordinatedGroup group,Member member, boolean memberStarted) {
+        this.group=group;
+        this.member = member;
+    }
+
+    
+    public void run() {
+        try {
+            doElection();
+        } catch (Exception e) {
+           LOG.error("Failed to run election",e);
+        }
+    }
+
+    void doElection() throws Exception {
+        List<MemberImpl> members = new ArrayList<MemberImpl>(this.group.getMembersImpl());
+        if ((this.member == null || (!this.member.getId().equals(this.group.getId()) || members.size() == this.group.getConfiguration().getMinimumGroupSize()))) {
+            
+            // call an election
+            while (!this.group.callElection() && this.group.isStarted() && isStarted())
+                ;
+            if (this.group.isStarted() && isStarted()) {
+                
+                this.group.setCoordinator(selectCordinator(members));
+                if (this.group.isCoordinatorMatch()) {
+                    this.group.broadcastElectionType(ElectionType.COORDINATOR);
+                }
+                if (!this.group.isElectionFinished() && isStarted()) {
+                    //ok - lets just wait for more members to show
+                    //we could be the coordinator now - but best to check
+                    try {
+                        synchronized (this.group.electionFinished) {
+                            this.group.electionFinished.wait(this.group.getConfiguration().getHeartBeatInterval() * 2);
+                        }
+                    } catch (InterruptedException e) {
+                    }
+                }
+                if (!this.group.isElectionFinished() && isStarted()) {
+                    // we must be the coordinator
+                    this.group.setCoordinator(this.group.getLocalMember());
+                    this.group.setElectionFinished(true);
+                    LOG.debug(this.group.getLocalMember()+" We are the Coordinator ");
+                    this.group.broadcastElectionType(ElectionType.COORDINATOR);
+                }
+            }
+        }
+    }
+    
+    protected MemberImpl selectCordinator(List<MemberImpl> list) throws Exception {
+        List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(list);
+        MemberImpl result = sorted.isEmpty() ? this.group.getLocalMember() : sorted
+                .get(list.size() - 1);
+        return result;
+    }
+
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Coordinated group messaging
+
+</body>
+</html>
\ No newline at end of file

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * keep track of multiple requests
+ *
+ */
+ public class AsyncGroupRequest implements RequestCallback {
+ private final Object mutex = new Object();
+    
+    private Set<Buffer> requests = new HashSet<Buffer>();
+
+    public void add(Buffer id, SendRequest request) {
+        request.setCallback(this);
+        this.requests.add(id);
+    }
+    
+    /**
+     * Wait for requests
+     * @param timeout
+     * @return
+     */
+    public boolean isSuccess(long timeout) {
+        long deadline = System.currentTimeMillis() + timeout;
+        while (!this.requests.isEmpty()) {
+            synchronized (this.mutex) {
+                try {
+                    this.mutex.wait(timeout);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+            timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+        }
+        return this.requests.isEmpty();
+    }
+
+    
+    public void finished(Buffer id) {
+        synchronized(this.mutex) {
+            this.requests.remove(id);
+            if (this.requests.isEmpty()) {
+                this.mutex.notify();
+            }
+        }
+        
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.Set;
+import org.apache.activeblaze.BlazeChannel;
+import org.apache.activeblaze.BlazeMessage;
+
+/**
+ * <P>
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
+ * 
+ */
+public interface BlazeGroupChannel extends BlazeChannel {
+    /**
+     * @return the name of this channel
+     */
+    public String getName();
+
+    /**
+     * Send a message to an individual member
+     * 
+     * @param member
+     * @param message
+     * @throws Exception
+     */
+    public void send(Member member, BlazeMessage message) throws Exception;
+
+    /**
+     * Send a message to an individual member and wait for a response
+     * 
+     * @param member
+     * @param message
+     * @return the response
+     * @throws Exception
+     */
+    public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception;
+
+    /**
+     * Send a message to an individual member and wait for a response
+     * 
+     * @param member
+     * @param message
+     * @param timeout
+     *            time in milliseconds to wait for a response
+     * @return a response of null if timed out
+     * @throws Exception
+     */
+    public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception;
+
+    /**
+     * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion
+     * 
+     * @param destination
+     * @param message
+     * @throws Exception
+     */
+    public void send(String destination, BlazeMessage message) throws Exception;
+
+    /**
+     * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
+     * for a response
+     * 
+     * @param destination
+     * @param message
+     * @return a response
+     * @throws Exception
+     */
+    public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception;
+
+    /**
+     * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
+     * for a response
+     * 
+     * @param destination
+     * @param message
+     * @param timeout -
+     *            time in milliseconds to wait for a response
+     * @return a response of null if timed out
+     * @throws Exception
+     */
+    public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception;
+
+    /**
+     * Send a response message to an original message - for request/response
+     * 
+     * @param to
+     *            the Member to send a response to
+     * @param response
+     *            the message to send in a response
+     * @param correlationId
+     *            the associated id from the original message
+     * @throws Exception
+     */
+    public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception;
+
+    /**
+     * @return the inboxListener
+     */
+    public BlazeQueueListener getInboxListener();
+
+    /**
+     * @param inboxListener
+     *            the inboxListener to set
+     */
+    public void setInboxListener(BlazeQueueListener inboxListener);
+
+    /**
+     * @return the configuration
+     */
+    public BlazeGroupConfiguration getGroupConfiguration();
+
+    /**
+     * @return a set of the members
+     */
+    public Set<Member> getMembers();
+
+    /**
+     * Get a member by its unique id
+     * 
+     * @param id
+     * @return
+     */
+    public Member getMemberById(String id);
+
+    /**
+     * Return a member of the Group with the matching name
+     * 
+     * @param name
+     * @return
+     */
+    public Member getMemberByName(String name);
+    
+    /**
+     * Will wait for a member to advertise itself if not available
+     * @param name
+     * @param timeout
+     * @return the member or null
+     * @throws InterruptedException 
+     */
+    public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException;
+
+    /**
+     * @return the local member that represents this <CODE>Group</CODE> instance
+     */
+    public Member getLocalMember();
+
+    /**
+     * Add a listener for membership changes
+     * 
+     * @param l
+     */
+    public void addMemberChangedListener(MemberChangedListener l);
+
+    /**
+     * Remove a listener for membership changes
+     * 
+     * @param l
+     */
+    public void removeMemberChangedListener(MemberChangedListener l);
+
+    /**
+     * Add a listener for messages
+     * 
+     * @param destination
+     * @param l
+     * @throws Exception 
+     */
+    public void addBlazeQueueMessageListener(String destination, BlazeQueueListener l) throws Exception;
+
+    /**
+     * Remove a listener for messages
+     * 
+     * @param destination
+     * @return the removed listener
+     * @throws Exception 
+     */
+    public BlazeQueueListener removeBlazeQueueMessageListener(String destination) throws Exception;
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeChannelFactory;
+
+/**
+ * Factory class for creating <Code>BlazeGroupChannel</CODE>
+ */
+public class BlazeGroupChannelFactory extends BlazeChannelFactory {
+    /**
+     * Default Constructor
+     */
+    public BlazeGroupChannelFactory() {
+        super(new BlazeGroupConfiguration());
+    }
+
+    /**
+     * Construct a factory to use the passed Configuration
+     * 
+     * @param config
+     */
+    public BlazeGroupChannelFactory(BlazeGroupConfiguration config) {
+        super(config);
+    }
+
+    /**
+     * Create a GroupChannel
+     * 
+     * @param name
+     * @return the Channel
+     * @throws Exception 
+     */
+    public BlazeGroupChannel createGroupChannel(String name) throws Exception {
+        BlazeGroupChannelImpl result = new BlazeGroupChannelImpl(name);
+        result.setConfiguration(getConfiguration().copy());
+        return result;
+    }
+
+    /**
+     * @return the configuration
+     */
+    public BlazeGroupConfiguration getConfiguration() {
+        return (BlazeGroupConfiguration) super.getConfiguration();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,647 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activeblaze.BlazeChannelImpl;
+import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.BlazeTopicListener;
+import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * <P>
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
+ * 
+ */
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
+    private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
+    private final String name;
+    private Processor unicast;
+    private BaseTransport groupManagementTransport;
+    private InetSocketAddress toManagementAddress;
+    private MemberImpl local;
+    private BlazeQueueListener inboxListener;
+    private Map<Buffer, SendRequest> messageRequests = new HashMap<Buffer, SendRequest>();
+    private Map<Buffer, BlazeQueueListener> queueMessageListenerMap = new ConcurrentHashMap<Buffer, BlazeQueueListener>();
+    private Group group;
+    private Buffer inboxURI;
+    private final Object localMutex = new Object();
+
+    /**
+     * Constructor
+     * 
+     * @param name
+     */
+    protected BlazeGroupChannelImpl(String name) {
+        super();
+        this.name = name;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#init()
+     */
+    public boolean init() throws Exception {
+        boolean result = super.init();
+        if (result) {
+            String unicastURIStr = getConfiguration().getUnicastURI();
+            unicastURIStr=PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
+            URI unicastURI = new URI(unicastURIStr);
+            this.inboxURI = new Buffer(unicastURIStr);
+            BaseTransport transport = TransportFactory.get(unicastURI);
+            transport.setName(getId() + "-Unicast");
+            this.unicast = configureProcess(transport);
+            this.unicast.init();
+            // if using a port of zero - the port will be assigned automatically,
+            // so need to get the potentially new value
+            unicastURI = transport.getLocalURI();
+            //append configuration properties
+            
+            String groupManagementURIStr = getGroupConfiguration().getGroupManagementURI();
+            groupManagementURIStr=PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
+            URI groupManagementURI = new URI(groupManagementURIStr);
+            this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(), groupManagementURI.getPort());
+            this.groupManagementTransport = TransportFactory.get(groupManagementURI);
+            configureTransport(this.groupManagementTransport);
+            this.groupManagementTransport.setPrev(this);
+            this.groupManagementTransport.setName(getId() + "-HeartbeatTransport");
+            this.groupManagementTransport.init();
+            this.local = createLocal(unicastURI);
+            this.group = createGroup();
+        }
+        return result;
+    }
+
+    protected MemberImpl createLocal(URI uri) throws Exception {
+        return new MemberImpl(getId(), getName(), 0, uri);
+    }
+
+    protected Group createGroup() {
+        return new Group(this);
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#shutDown()
+     */
+    public boolean shutDown() throws Exception {
+        boolean result = super.shutDown();
+        if (result) {
+            this.group.shutDown();
+            this.groupManagementTransport.shutDown();
+            this.unicast.shutDown();
+        }
+        return result;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#start()
+     */
+    public boolean start() throws Exception {
+        boolean result = super.start();
+        if (result) {
+            this.groupManagementTransport.start();
+            this.unicast.start();
+            this.group.start();
+        }
+        return result;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.Service#stop()
+     */
+    public boolean stop() throws Exception {
+        boolean result = super.stop();
+        if (result) {
+            this.group.stop();
+            this.groupManagementTransport.stop();
+            this.unicast.stop();
+        }
+        return result;
+    }
+
+    /**
+     * @return the name
+     */
+    public String getName() {
+        return this.name;
+    }
+
+    /**
+     * @return the inboxListener
+     */
+    public BlazeQueueListener getInboxListener() {
+        return this.inboxListener;
+    }
+
+    /**
+     * @param inboxListener
+     *            the inboxListener to set
+     */
+    public void setInboxListener(BlazeQueueListener inboxListener) {
+        this.inboxListener = inboxListener;
+    }
+
+    /**
+     * @return this channel's configuration
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#getGroupConfiguration()
+     */
+    public BlazeGroupConfiguration getGroupConfiguration() {
+        return (BlazeGroupConfiguration) getConfiguration();
+    }
+
+    /**
+     * @return the member for this channel
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#getLocalMember()
+     */
+    public final MemberImpl getLocalMember() {
+        synchronized (this.localMutex) {
+            return this.local;
+        }
+    }
+
+    /**
+     * @param l
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
+     */
+    public void addMemberChangedListener(MemberChangedListener l) {
+        this.group.addMemberChangedListener(l);
+    }
+
+    /**
+     * @param l
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
+     */
+    public void removeMemberChangedListener(MemberChangedListener l) {
+        this.group.removeMemberChangedListener(l);
+    }
+
+    /**
+     * @param id
+     * @return
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
+     */
+    public Member getMemberById(String id) {
+        return this.group.getMemberById(id);
+    }
+
+    /**
+     * @param name
+     * @return
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
+     */
+    public Member getMemberByName(String name) {
+        return this.group.getMemberByName(name);
+    }
+
+    /**
+     * Will wait for a member to advertise itself if not available
+     * 
+     * @param name
+     * @param timeout
+     * @return the member or null
+     * @throws InterruptedException
+     */
+    public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
+        return this.group.getAndWaitForMemberByName(name, timeout);
+    }
+
+    /**
+     * @return
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
+     */
+    public Set<Member> getMembers() {
+        return this.group.getMembers();
+    }
+
+    /**
+     * Send a message to a member of the group - in a round-robin fashion
+     * 
+     * @param destination
+     * @param message
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
+     */
+    public void send(String destination, BlazeMessage message) throws Exception {
+        Buffer key = new Buffer(destination);
+        MemberImpl member = getQueueDestination(key);
+        if (member != null) {
+            send(member, key, message);
+        }
+    }
+
+    /**
+     * @param member
+     * @param destination
+     * @param message
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member,
+     *      org.apache.activeblaze.BlazeMessage)
+     */
+    public void send(Member member, BlazeMessage message) throws Exception {
+        send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
+    }
+
+    /**
+     * @param member
+     * @param message
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
+     *      org.apache.activeblaze.BlazeMessage)
+     */
+    public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
+        return sendRequest(member, message, 0);
+    }
+
+    /**
+     * @param member
+     * @param message
+     * @param timeout
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
+     *      org.apache.activeblaze.BlazeMessage, int)
+     */
+    public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
+        return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
+    }
+
+    /**
+     * @param destination
+     * @param message
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage)
+     */
+    public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
+        Buffer key = new Buffer(destination);
+        MemberImpl member = getQueueDestination(key);
+        return sendRequest(member, key, message, 0);
+    }
+
+    /**
+     * @param destination
+     * @param message
+     * @param timeout
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
+     *      org.apache.activeblaze.BlazeMessage, int)
+     */
+    public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
+        Buffer key = new Buffer(destination);
+        MemberImpl member = getQueueDestination(key);
+        return sendRequest(member, key, message, timeout);
+    }
+
+    protected synchronized BlazeMessage sendRequest(MemberImpl member, Buffer destination, BlazeMessage message,
+            int timeout) throws Exception {
+        BlazeMessage result = null;
+        if (member != null) {
+            SendRequest request = new SendRequest();
+            message.storeContent();
+            BlazeData blazeData = message.getContent();
+            blazeData.setTopic(false);
+            blazeData.setDestination(destination);
+            PacketData packetData = getPacketData(blazeData.type(), blazeData);
+            synchronized (this.messageRequests) {
+                this.messageRequests.put(packetData.getMessageId(), request);
+            }
+            packetData.setFromAddress(this.inboxURI);
+            Packet packet = new Packet(packetData);
+            packet.setTo((member).getAddress());
+            this.unicast.downStream(packet);
+            packetData = (PacketData) request.get(timeout);
+            result = buildBlazeMessage(packetData);
+        }
+        return result;
+    }
+
+    /**
+     * @param to
+     * @param response
+     * @param correlationId
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
+     *      org.apache.activeblaze.BlazeMessage, java.lang.String)
+     */
+    public synchronized void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
+        response.storeContent();
+        BlazeData blazeData = response.getContent();
+        blazeData.setTopic(false);
+        PacketData data = getPacketData(blazeData.type(), blazeData);
+        data.setCorrelationId(new Buffer(correlationId));
+        data.setReliable(true);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(((MemberImpl) to).getAddress());
+        this.unicast.downStream(packet);
+        
+    }
+
+    protected void send(MemberImpl member, Buffer destination, BlazeMessage message) throws Exception {
+        message.storeContent();
+        BlazeData blazeData = message.getContent();
+        send(member, destination, blazeData);
+    }
+
+    protected synchronized void send(MemberImpl member, Buffer destination, BlazeData blazeData) throws Exception {
+        blazeData.setTopic(false);
+        blazeData.setDestination(destination);
+        PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(member.getAddress());
+        this.unicast.downStream(packet);
+    }
+
+    /**
+     * @param destination
+     * @param l
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
+     *      org.apache.activeblaze.group.BlazeQueueListener)
+     */
+    public void addBlazeQueueMessageListener(String destination, BlazeQueueListener l) throws Exception {
+        init();
+        Buffer key = new Buffer(destination);
+        this.queueMessageListenerMap.put(key, l);
+        buildLocal();
+    }
+
+    /**
+     * @param destination
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeGroupMessageListener(java.lang.String)
+     */
+    public BlazeQueueListener removeBlazeQueueMessageListener(String destination) throws Exception {
+        init();
+        Buffer key = new Buffer(destination);
+        BlazeQueueListener result = this.queueMessageListenerMap.remove(key);
+        buildLocal();
+        return result;
+    }
+
+    /**
+     * @param destination
+     * @param l
+     * @throws Exception
+     * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
+     *      org.apache.activeblaze.BlazeTopicListener)
+     */
+    public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception {
+        init();
+        super.addBlazeTopicMessageListener(destination, l);
+        buildLocal();
+    }
+
+    /**
+     * @param destination
+     * @param l
+     * @return
+     * @throws Exception
+     * @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
+     *      org.apache.activeblaze.BlazeTopicListener)
+     */
+    public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception {
+        init();
+        BlazeTopicListener result = super.removeBlazeTopicMessageListener(destination);
+        buildLocal();
+        return result;
+    }
+
+    protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+        if (isStarted()) {
+            if (!processRequest(correlationId, data)) {
+                MessageType type = MessageType.valueOf(data.getType());
+                if (type == MessageType.BLAZE_DATA) {
+                    doProcessBlazeData(data);
+                } else if (type == MessageType.MEMBER_DATA) {
+                    doProcessMemberData(data);
+                }
+            }
+        }
+    }
+
+    boolean processRequest(Buffer correlationId, Message<?> value) {
+        boolean result = false;
+        if (correlationId != null) {
+            SendRequest request = null;
+            synchronized (this.messageRequests) {
+                request = this.messageRequests.remove(correlationId);
+            }
+            if (request != null) {
+                request.put(correlationId, value);
+                result = true;
+            }
+        }
+        return result;
+    }
+
+    protected void doProcessBlazeData(PacketData data) throws Exception {
+        BlazeMessage message = (BlazeMessage) buildBlazeMessage(data);
+        if (message.getContent().getTopic()) {
+            super.processBlazeMessage(message);
+        } else {
+            Buffer destination = message.getContent().getDestination();
+            if (this.inboxListener != null && this.producerId.equals(destination)) {
+                this.inboxListener.onMessage(message);
+            } else {
+                for (Map.Entry<Buffer, BlazeQueueListener> entry : this.queueMessageListenerMap.entrySet()) {
+                    if (DestinationMatch.isMatch(destination, entry.getKey())) {
+                        entry.getValue().onMessage(message);
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    protected Group getGroup() {
+        return this.group;
+    }
+
+    protected BlazeMessage createMessage(String fromId) {
+        Member member = this.group.getMemberById(fromId);
+        BlazeMessage message = new BlazeGroupMessage(member);
+        return message;
+    }
+
+    protected void doProcessMemberData(PacketData data) throws Exception {
+        MessageType type = MessageType.MEMBER_DATA;
+        MemberData memberData = (MemberData) type.createMessage();
+        Buffer payload = data.getPayload();
+        memberData.mergeFramed(payload);
+        this.group.processMember(memberData);
+    }
+
+    /**
+     * @param messageType
+     * @param message
+     * @throws Exception
+     */
+    public synchronized void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
+        PacketData data = getPacketData(messageType, message);
+        data.setReliable(false);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(this.toManagementAddress);
+        this.groupManagementTransport.downStream(packet);
+    }
+
+    /**
+     * @param asyncRequest
+     * @param messageType
+     * @param message
+     * @throws Exception
+     */
+    public synchronized void broadcastMessage(AsyncGroupRequest asyncRequest, MessageType messageType,
+            Message<?> message) throws Exception {
+        SendRequest request = new SendRequest();
+        PacketData data = getPacketData(messageType, message);
+        asyncRequest.add(data.getMessageId(), request);
+        synchronized (this.messageRequests) {
+            this.messageRequests.put(data.getMessageId(), request);
+        }
+        data.setReliable(false);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(this.toManagementAddress);
+        this.groupManagementTransport.downStream(packet);
+    }
+
+    /**
+     * @param to
+     * @param messageType
+     * @param message
+     * @throws Exception
+     */
+    public synchronized void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message)
+            throws Exception {
+        PacketData data = getPacketData(messageType, message);
+        data.setReliable(false);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(to);
+        this.unicast.downStream(packet);
+    }
+
+    /**
+     * @param to 
+     * @param messageType
+     * @param message
+     * @param correlationId
+     * @throws Exception
+     */
+    public synchronized void sendReply(MemberImpl to,MessageType messageType, Message<?> message, String correlationId)
+            throws Exception {
+        PacketData data = getPacketData(messageType, message);
+        data.setCorrelationId(new Buffer(correlationId));
+        data.setReliable(false);
+        data.setFromAddress(this.inboxURI);
+        Packet packet = new Packet(data);
+        packet.setTo(to.getAddress());
+        this.unicast.downStream(packet);
+    }
+
+    protected MemberImpl getQueueDestination(Buffer destination) {
+        // choose a member
+        MemberImpl result = null;
+        Map<Buffer, List<MemberImpl>> map = this.group.getQueueMap();
+        List<MemberImpl> list = map.get(destination);
+        if (list == null) {
+            // search through wildcard matches
+            for (Buffer buffer : map.keySet()) {
+                if (DestinationMatch.isMatch(destination, buffer)) {
+                    list = map.get(destination);
+                    break;
+                }
+            }
+        }
+        if (list != null && !list.isEmpty()) {
+            result = list.remove(0);
+            // round-robin
+            list.add(result);
+        }
+        return result;
+    }
+
+    protected void buildLocal() {
+        if (isInitialized()) {
+            try {
+                synchronized (this.localMutex) {
+                    MemberImpl result = new MemberImpl(getLocalMember().getData().clone());
+                    result.getData().clearDestination();
+                    // add topic destinations
+                    for (Buffer destination : this.topicessageListenerMap.keySet()) {
+                        DestinationData data = new DestinationData();
+                        data.setDestination(destination);
+                        data.setTopic(true);
+                        result.getData().addDestination(data);
+                    }
+                    // add Queue Destinations
+                    for (Buffer destination : this.queueMessageListenerMap.keySet()) {
+                        DestinationData data = new DestinationData();
+                        data.setDestination(destination);
+                        data.setTopic(false);
+                        result.getData().addDestination(data);
+                    }
+                    this.group.processMemberUpdate(this.local, result);
+                    result.getData().setDestinationsChanged(true);
+                    this.group.broadcastHeartBeat(result);
+                    result.getData().clearDestinationsChanged();
+                    this.local = result;
+                    this.group.updateLocal(this.local);
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to update local member ", e);
+            }
+        } else {
+            throw new BlazeRuntimeException("Not Initialized");
+        }
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeConfiguration;
+
+/**
+ * Configuration for a BlazeGroupChannel
+ *
+ */
+public class BlazeGroupConfiguration extends BlazeConfiguration {
+    private String groupManagementURI = "mcast://224.2.2.2:8888";
+    
+    private int heartBeatInterval = 1000;
+   
+    /**
+     * @return the groupManagementUTI
+     */
+    public String getGroupManagementURI() {
+        return this.groupManagementURI;
+    }
+
+    /**
+     * @param groupManagementURI
+     *            the groupManagementURI to set
+     */
+    public void setGroupManagementURI(String groupManagementURI) {
+        this.groupManagementURI = groupManagementURI;
+    }
+
+    
+    /**
+     * @return the heartBeatInterval
+     */
+    public int getHeartBeatInterval() {
+        return this.heartBeatInterval;
+    }
+
+    /**
+     * @param heartBeatInterval the heartBeatInterval to set
+     */
+    public void setHeartBeatInterval(int heartBeatInterval) {
+        this.heartBeatInterval = heartBeatInterval;
+    }
+    
+    protected BlazeConfiguration newInstance() {
+        return new BlazeGroupConfiguration();
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeMessage;
+
+/**
+ * Has information about the sender of the Message
+ * This type of message is created on receiver
+ *
+ */
+public class BlazeGroupMessage extends BlazeMessage {
+    private final Member sender;
+    
+    
+    /**
+     * Constructor
+     * @param sender
+     */
+    public BlazeGroupMessage(Member sender){
+        this.sender=sender;
+    }
+    
+    public BlazeMessage copy() throws BlazeException{
+        BlazeMessage copy = new BlazeGroupMessage(this.sender);
+        copy(copy);
+        return copy;
+    }
+
+    /**
+     * @return the sender
+     */
+    public Member getSender() {
+        return this.sender;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeMessage;
+
+
+/**
+ * A listener for BlazeMessages
+ *
+ */
+public interface BlazeQueueListener {
+    
+    /**
+     * Called when a Message is available to be processes
+     * @param message
+     */
+    public void onMessage(BlazeMessage message);
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Represents a Destination
+ * 
+ */
+class Destination {
+    private final boolean topic;
+    private final Buffer destination;
+    private final MemberImpl member;
+
+    Destination(MemberImpl member, DestinationData data) {
+        this.member = member;
+        this.topic = data.getTopic();
+        this.destination = data.getDestination();
+    }
+
+    /**
+     * @return the topic
+     */
+    public boolean isTopic() {
+        return this.topic;
+    }
+
+    /**
+     * @return the destination
+     */
+    public Buffer getDestination() {
+        return this.destination;
+    }
+
+    /**
+     * @return the member
+     */
+    public MemberImpl getMember() {
+        return this.member;
+    }
+
+    public int hashCode() {
+        return getMember().getId().hashCode() ^ getDestination().hashCode();
+    }
+
+    public boolean equals(Object object) {
+        boolean result = false;
+        if (object instanceof Destination) {
+            Destination other = (Destination) object;
+            result = other.getMember().getId().equals(this.member.getId())
+                    && other.destination.equals(this.destination);
+        }
+        return result;
+    }
+}

Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java
------------------------------------------------------------------------------
    svn:eol-style = native