You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/21 21:44:43 UTC
svn commit: r719706 [3/6] - in /activemq/activemq-blaze: ./ branches/ tags/
trunk/ trunk/src/ trunk/src/main/ trunk/src/main/java/
trunk/src/main/java/org/ trunk/src/main/java/org/apache/
trunk/src/main/java/org/apache/activeblaze/ trunk/src/main/java/...
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.net.URI;
+import org.apache.activeblaze.group.BlazeGroupChannelImpl;
+import org.apache.activeblaze.group.Group;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.wire.ElectionMessage;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication and maintains a coordinator
+ * (elected leader) for the group
+ *
+ */
+public class BlazeCoordinatedGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeCoordinatedGroupChannel {
+ private static final Log LOG = LogFactory.getLog(BlazeCoordinatedGroupChannelImpl.class);
+ private CoordinatedGroup coordinatedGroup;
+
+ /**
+ * Constructor
+ *
+ * @param name
+ */
+ public BlazeCoordinatedGroupChannelImpl(String name) {
+ super(name);
+ }
+
+ /**
+ * @param l
+ * @throws Exception
+ * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#addCoordinatorChangedListener(org.apache.activeblaze.coordinated.CoordinatorChangedListener)
+ */
+ public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception {
+ init();
+ this.coordinatedGroup.addCoordinatorChangedListener(l);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#getCoordinator()
+ */
+ public Member getCoordinator() throws Exception {
+ init();
+ return this.coordinatedGroup.getCoordinator();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#isCoordinator()
+ */
+ public boolean isCoordinator() throws Exception {
+ init();
+ return this.coordinatedGroup.isCoordinatorMatch();
+ }
+
+ /**
+ * @param l
+ * @throws Exception
+ * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#removeMemberChangedListener(org.apache.activeblaze.coordinated.CoordinatorChangedListener)
+ */
+ public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception {
+ init();
+ this.coordinatedGroup.removeCoordinatorChangedListener(l);
+ }
+
+ /**
+ * @return
+ * @see org.apache.activeblaze.coordinated.BlazeCoordinatedGroupChannel#getCoordinatedGroupConfiguration()
+ */
+ public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration() {
+ return (BlazeCoordinatedGroupConfiguration) getGroupConfiguration();
+ }
+
+ /**
+ * @param timeout
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#waitForElection(int)
+ */
+ public boolean waitForElection(int timeout) throws Exception {
+ init();
+ return this.coordinatedGroup.waitForElection(timeout);
+ }
+
+ protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+ if (isStarted()) {
+ MessageType type = MessageType.valueOf(data.getType());
+ if (type == MessageType.BLAZE_DATA) {
+ doProcessBlazeData(data);
+ } else if (type == MessageType.MEMBER_DATA) {
+ doProcessMemberData(data);
+ } else if (type.equals(MessageType.ELECTION_MESSAGE)) {
+ doProcessElectionData(id, data);
+ } else {
+ LOG.error("Unknown message type " + data);
+ }
+ }
+ }
+
+ protected MemberImpl createLocal(URI uri) throws Exception {
+ return new MemberImpl(getId(), getName(), getCoordinatedGroupConfiguration().getCoordinatorWeight(), uri);
+ }
+
+ protected Group createGroup() {
+ this.coordinatedGroup = new CoordinatedGroup(this);
+ return this.coordinatedGroup;
+ }
+
+ protected void doProcessElectionData(String id, PacketData data) throws Exception {
+ MessageType type = MessageType.ELECTION_MESSAGE;
+ ElectionMessage electionMessage = (ElectionMessage) type.createMessage();
+ Buffer payload = data.getPayload();
+ electionMessage.mergeFramed(payload);
+ CoordinatedGroup group = (CoordinatedGroup) getGroup();
+ group.processElectionMessage(electionMessage, id);
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupChannelImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.BlazeConfiguration;
+import org.apache.activeblaze.group.BlazeGroupConfiguration;
+
+/**
+ * Configuration for a BlazeCoordinatedGroupChannel
+ *
+ */
+public class BlazeCoordinatedGroupConfiguration extends BlazeGroupConfiguration{
+ private long coordinatorWeight = 0;
+ private int minimumGroupSize = 1;
+ private long awaitGroupTimeout = getHeartBeatInterval()*2;
+
+
+ /**
+ * @return the coordinatorWeight
+ */
+ public long getCoordinatorWeight() {
+ return this.coordinatorWeight;
+ }
+
+ /**
+ * @param coordinatorWeight the coordinatorWeight to set
+ */
+ public void setCoordinatorWeight(long coordinatorWeight) {
+ this.coordinatorWeight = coordinatorWeight;
+ }
+
+ /**
+ * @return the minimumGroupSize
+ */
+ public int getMinimumGroupSize() {
+ return this.minimumGroupSize;
+ }
+
+ /**
+ * @param minimumGroupSize the minimumGroupSize to set
+ */
+ public void setMinimumGroupSize(int minimumGroupSize) {
+ this.minimumGroupSize = minimumGroupSize;
+ }
+
+ /**
+ * @return the awaitGroupTimeout
+ */
+ public long getAwaitGroupTimeout() {
+ return this.awaitGroupTimeout;
+ }
+
+ /**
+ * @param awaitGroupTimeout the awaitGroupTimeout to set
+ */
+ public void setAwaitGroupTimeout(long awaitGroupTimeout) {
+ this.awaitGroupTimeout = awaitGroupTimeout;
+ }
+
+ protected BlazeConfiguration newInstance() {
+ return new BlazeCoordinatedGroupConfiguration();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/BlazeCoordinatedGroupConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.group.AsyncGroupRequest;
+import org.apache.activeblaze.group.Group;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.wire.ElectionMessage;
+import org.apache.activeblaze.wire.ElectionType;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Implementation of Group State
+ *
+ */
+public class CoordinatedGroup extends Group {
+ static final Log LOG = LogFactory.getLog(CoordinatedGroup.class);
+ final BlazeCoordinatedGroupChannelImpl channel;
+ private final BlazeCoordinatedGroupConfiguration configuration;
+ private ThreadPoolExecutor electionExecutor;
+ private MemberImpl coordinator;
+ private List<CoordinatorChangedListener> listeners = new CopyOnWriteArrayList<CoordinatorChangedListener>();
+ final AtomicBoolean electionFinished = new AtomicBoolean(false);
+ private long startTime;
+
+ /**
+ * Constructor
+ *
+ * @param local
+ * @param channel
+ * @param transport
+ * @param config
+ */
+ protected CoordinatedGroup(BlazeCoordinatedGroupChannelImpl channel) {
+ super(channel);
+ this.channel = channel;
+ this.coordinator = this.channel.getLocalMember();
+ this.configuration = channel.getCoordinatedGroupConfiguration();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#start()
+ */
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result) {
+ this.startTime = System.currentTimeMillis();
+ this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "Election{" + CoordinatedGroup.this.channel.getId()
+ + "}");
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+ return result;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#stop()
+ */
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (result) {
+ if (this.electionExecutor != null) {
+ this.electionExecutor.shutdownNow();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @return true if there is elections have finished
+ */
+ public boolean isElectionFinished() {
+ return this.electionFinished.get();
+ }
+
+ void setElectionFinished(boolean flag) {
+ synchronized (this.electionFinished) {
+ this.electionFinished.set(flag);
+ // this.electionFinished.notifyAll();
+ }
+ }
+
+ /**
+ * Process a new member
+ *
+ * @param data
+ * @throws Exception
+ */
+ protected MemberImpl processMember(MemberData data) throws Exception {
+ MemberImpl result = super.processMember(data);
+ if (result != null || (!isElectionFinished() && !data.getId().equals(getLocalMember().getId()))) {
+ election(result, true);
+ }
+ return result;
+ }
+
+ protected void processMemberStopped(MemberImpl member) throws Exception {
+ super.processMemberStopped(member);
+ election(null, false);
+ }
+
+ void election(final Member member, final boolean memberStarted) throws Exception {
+ if (isStarted() && this.electionExecutor != null && !this.electionExecutor.isShutdown()) {
+ synchronized (this.electionFinished) {
+ this.electionFinished.set(false);
+ }
+ if (this.members.size() >= getConfiguration().getMinimumGroupSize()
+ || (getConfiguration().getAwaitGroupTimeout() + this.startTime) < System.currentTimeMillis())
+ synchronized (this.electionExecutor) {
+ // remove any queued election tasks
+ List<Runnable> list = new ArrayList<Runnable>(this.electionExecutor.getQueue());
+ for (Runnable r : list) {
+ ElectionService es = (ElectionService) r;
+ es.stop();
+ this.electionExecutor.remove(es);
+ }
+ }
+ ElectionService es = new ElectionService(this, member, memberStarted);
+ es.start();
+ this.electionExecutor.execute(es);
+ }
+ }
+
+ /**
+ * @return true if the coordinator for the map
+ */
+ protected boolean isCoordinatorMatch() {
+ String coordinatorId = this.coordinator != null ? this.coordinator.getId() : "";
+ return this.channel.getId().equals(coordinatorId);
+ }
+
+ protected MemberImpl getCoordinator() {
+ return this.coordinator;
+ }
+
+ protected void setCoordinator(MemberImpl member) {
+ this.coordinator = member;
+ }
+
+ protected void addCoordinatorChangedListener(CoordinatorChangedListener l) {
+ this.listeners.add(l);
+ }
+
+ /**
+ * Remove a listener for membership changes
+ *
+ * @param l
+ * @throws Exception
+ */
+ protected void removeCoordinatorChangedListener(CoordinatorChangedListener l) {
+ this.listeners.remove(l);
+ }
+
+ protected void fireCoordinatorChanged(MemberImpl newCoordinator) {
+ for (CoordinatorChangedListener l : this.listeners) {
+ l.coordinatorChanged(newCoordinator);
+ }
+ }
+
+ boolean callElection() throws Exception {
+ List<MemberImpl> members = new ArrayList<MemberImpl>(this.members.values());
+ List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(members);
+ AsyncGroupRequest request = new AsyncGroupRequest();
+ boolean doCall = false;
+ for (Member member : sorted) {
+ if (this.channel.getId().equals(member.getId())) {
+ doCall = true;
+ } else if (doCall) {
+ ElectionMessage msg = new ElectionMessage();
+ msg.setMember(this.channel.getLocalMember().getData());
+ msg.setElectionType(ElectionType.ELECTION);
+ this.channel.broadcastMessage(request, msg.type(), msg);
+ }
+ }
+ boolean result = request.isSuccess(this.configuration.getHeartBeatInterval());
+ return result;
+ }
+
+ void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception {
+ MemberImpl from = new MemberImpl(msg.getMember());
+ if (from != null && !from.getId().equals(getLocalMember().getId())) {
+ LOG.debug(getLocalMember()+" Election message "+ msg.getElectionType() + " from " + from);
+ if (msg.getElectionType().equals(ElectionType.ELECTION)) {
+ ElectionMessage reply = new ElectionMessage();
+ reply.setElectionType(ElectionType.ANSWER);
+ reply.setMember(this.channel.getLocalMember().getData());
+ this.channel.sendReply(from, msg.type(), reply, correlationId);
+ election(null, false);
+ } else if (msg.getElectionType().equals(ElectionType.COORDINATOR)) {
+ this.coordinator=from;
+ LOG.debug(getLocalMember()+" Coordinator is "+ from);
+ setElectionFinished(true);
+ }
+ }
+ }
+
+ void broadcastElectionType(ElectionType type) throws Exception {
+ if (isStarted()) {
+ ElectionMessage msg = new ElectionMessage();
+ msg.setMember(this.channel.getLocalMember().getData());
+ msg.setElectionType(type);
+ this.channel.broadcastMessage(MessageType.ELECTION_MESSAGE, msg);
+ }
+ }
+
+ boolean waitForElection(int timeout) throws Exception {
+ long deadline = 0;
+ if (timeout > 0) {
+ deadline = System.currentTimeMillis() + timeout;
+ }
+ synchronized (this.electionFinished) {
+ while (isStarted() && !this.electionFinished.get()) {
+ try {
+ this.electionFinished.wait(timeout);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted in waitForElection");
+ stop();
+ }
+ if (timeout > 0) {
+ timeout = (int) Math.max(deadline - System.currentTimeMillis(), 0l);
+ }
+ }
+ }
+ return !isStopped() && this.electionFinished.get();
+ }
+
+ protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
+ Collections.sort(list, new Comparator<Member>() {
+ public int compare(Member m1, Member m2) {
+ long result = m1.getCoordinatorWeight() - m2.getCoordinatorWeight();
+ if (result == 0) {
+ result = m1.getId().compareTo(m2.getId());
+ }
+ return (int) result;
+ }
+ });
+ return list;
+ }
+
+ /**
+ * @return the configuration
+ */
+ public BlazeCoordinatedGroupConfiguration getConfiguration() {
+ return this.configuration;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatedGroup.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+
+/**
+ * A listener for coordinator changes to a group
+ *
+ */
+public interface CoordinatorChangedListener extends MemberChangedListener {
+ /**
+ * Fired when a coordinator changes in the group
+ * @param newCoordinator
+ */
+ void coordinatorChanged(Member newCoordinator);
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/CoordinatorChangedListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.coordinated;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.activeblaze.BaseService;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberImpl;
+import org.apache.activeblaze.wire.ElectionType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Responsible for calling an election amongst the members and deciding a
+ * coordinator
+ *
+ */
+class ElectionService extends BaseService implements Runnable {
+ private static final Log LOG = LogFactory.getLog(ElectionService.class);
+ private final CoordinatedGroup group;
+ private Member member;
+ ElectionService(CoordinatedGroup group,Member member, boolean memberStarted) {
+ this.group=group;
+ this.member = member;
+ }
+
+
+ public void run() {
+ try {
+ doElection();
+ } catch (Exception e) {
+ LOG.error("Failed to run election",e);
+ }
+ }
+
+ void doElection() throws Exception {
+ List<MemberImpl> members = new ArrayList<MemberImpl>(this.group.getMembersImpl());
+ if ((this.member == null || (!this.member.getId().equals(this.group.getId()) || members.size() == this.group.getConfiguration().getMinimumGroupSize()))) {
+
+ // call an election
+ while (!this.group.callElection() && this.group.isStarted() && isStarted())
+ ;
+ if (this.group.isStarted() && isStarted()) {
+
+ this.group.setCoordinator(selectCordinator(members));
+ if (this.group.isCoordinatorMatch()) {
+ this.group.broadcastElectionType(ElectionType.COORDINATOR);
+ }
+ if (!this.group.isElectionFinished() && isStarted()) {
+ //ok - lets just wait for more members to show
+ //we could be the coordinator now - but best to check
+ try {
+ synchronized (this.group.electionFinished) {
+ this.group.electionFinished.wait(this.group.getConfiguration().getHeartBeatInterval() * 2);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ if (!this.group.isElectionFinished() && isStarted()) {
+ // we must be the coordinator
+ this.group.setCoordinator(this.group.getLocalMember());
+ this.group.setElectionFinished(true);
+ LOG.debug(this.group.getLocalMember()+" We are the Coordinator ");
+ this.group.broadcastElectionType(ElectionType.COORDINATOR);
+ }
+ }
+ }
+ }
+
+ protected MemberImpl selectCordinator(List<MemberImpl> list) throws Exception {
+ List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(list);
+ MemberImpl result = sorted.isEmpty() ? this.group.getLocalMember() : sorted
+ .get(list.size() - 1);
+ return result;
+ }
+
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/ElectionService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/coordinated/package.html Fri Nov 21 12:44:40 2008
@@ -0,0 +1,25 @@
+!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Coordinated group messaging
+
+</body>
+</html>
\ No newline at end of file
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * keep track of multiple requests
+ *
+ */
+ public class AsyncGroupRequest implements RequestCallback {
+ private final Object mutex = new Object();
+
+ private Set<Buffer> requests = new HashSet<Buffer>();
+
+ public void add(Buffer id, SendRequest request) {
+ request.setCallback(this);
+ this.requests.add(id);
+ }
+
+ /**
+ * Wait for requests
+ * @param timeout
+ * @return
+ */
+ public boolean isSuccess(long timeout) {
+ long deadline = System.currentTimeMillis() + timeout;
+ while (!this.requests.isEmpty()) {
+ synchronized (this.mutex) {
+ try {
+ this.mutex.wait(timeout);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ timeout = Math.max(deadline - System.currentTimeMillis(), 0);
+ }
+ return this.requests.isEmpty();
+ }
+
+
+ public void finished(Buffer id) {
+ synchronized(this.mutex) {
+ this.requests.remove(id);
+ if (this.requests.isEmpty()) {
+ this.mutex.notify();
+ }
+ }
+
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/AsyncGroupRequest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.util.Set;
+import org.apache.activeblaze.BlazeChannel;
+import org.apache.activeblaze.BlazeMessage;
+
+/**
+ * <P>
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
+ *
+ */
+public interface BlazeGroupChannel extends BlazeChannel {
+ /**
+ * @return the name of this channel
+ */
+ public String getName();
+
+ /**
+ * Send a message to an individual member
+ *
+ * @param member
+ * @param message
+ * @throws Exception
+ */
+ public void send(Member member, BlazeMessage message) throws Exception;
+
+ /**
+ * Send a message to an individual member and wait for a response
+ *
+ * @param member
+ * @param message
+ * @return the response
+ * @throws Exception
+ */
+ public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception;
+
+ /**
+ * Send a message to an individual member and wait for a response
+ *
+ * @param member
+ * @param message
+ * @param timeout
+ * time in milliseconds to wait for a response
+ * @return a response of null if timed out
+ * @throws Exception
+ */
+ public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception;
+
+ /**
+ * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion
+ *
+ * @param destination
+ * @param message
+ * @throws Exception
+ */
+ public void send(String destination, BlazeMessage message) throws Exception;
+
+ /**
+ * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
+ * for a response
+ *
+ * @param destination
+ * @param message
+ * @return a response
+ * @throws Exception
+ */
+ public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception;
+
+ /**
+ * Send a message to the group - one Member will be picked to receive the message in a round-robin fashion and wait
+ * for a response
+ *
+ * @param destination
+ * @param message
+ * @param timeout -
+ * time in milliseconds to wait for a response
+ * @return a response of null if timed out
+ * @throws Exception
+ */
+ public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception;
+
+ /**
+ * Send a response message to an original message - for request/response
+ *
+ * @param to
+ * the Member to send a response to
+ * @param response
+ * the message to send in a response
+ * @param correlationId
+ * the associated id from the original message
+ * @throws Exception
+ */
+ public void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception;
+
+ /**
+ * @return the inboxListener
+ */
+ public BlazeQueueListener getInboxListener();
+
+ /**
+ * @param inboxListener
+ * the inboxListener to set
+ */
+ public void setInboxListener(BlazeQueueListener inboxListener);
+
+ /**
+ * @return the configuration
+ */
+ public BlazeGroupConfiguration getGroupConfiguration();
+
+ /**
+ * @return a set of the members
+ */
+ public Set<Member> getMembers();
+
+ /**
+ * Get a member by its unique id
+ *
+ * @param id
+ * @return
+ */
+ public Member getMemberById(String id);
+
+ /**
+ * Return a member of the Group with the matching name
+ *
+ * @param name
+ * @return
+ */
+ public Member getMemberByName(String name);
+
+ /**
+ * Will wait for a member to advertise itself if not available
+ * @param name
+ * @param timeout
+ * @return the member or null
+ * @throws InterruptedException
+ */
+ public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException;
+
+ /**
+ * @return the local member that represents this <CODE>Group</CODE> instance
+ */
+ public Member getLocalMember();
+
+ /**
+ * Add a listener for membership changes
+ *
+ * @param l
+ */
+ public void addMemberChangedListener(MemberChangedListener l);
+
+ /**
+ * Remove a listener for membership changes
+ *
+ * @param l
+ */
+ public void removeMemberChangedListener(MemberChangedListener l);
+
+ /**
+ * Add a listener for messages
+ *
+ * @param destination
+ * @param l
+ * @throws Exception
+ */
+ public void addBlazeQueueMessageListener(String destination, BlazeQueueListener l) throws Exception;
+
+ /**
+ * Remove a listener for messages
+ *
+ * @param destination
+ * @return the removed listener
+ * @throws Exception
+ */
+ public BlazeQueueListener removeBlazeQueueMessageListener(String destination) throws Exception;
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeChannelFactory;
+
+/**
+ * Factory class for creating <Code>BlazeGroupChannel</CODE>
+ */
+public class BlazeGroupChannelFactory extends BlazeChannelFactory {
+ /**
+ * Default Constructor
+ */
+ public BlazeGroupChannelFactory() {
+ super(new BlazeGroupConfiguration());
+ }
+
+ /**
+ * Construct a factory to use the passed Configuration
+ *
+ * @param config
+ */
+ public BlazeGroupChannelFactory(BlazeGroupConfiguration config) {
+ super(config);
+ }
+
+ /**
+ * Create a GroupChannel
+ *
+ * @param name
+ * @return the Channel
+ * @throws Exception
+ */
+ public BlazeGroupChannel createGroupChannel(String name) throws Exception {
+ BlazeGroupChannelImpl result = new BlazeGroupChannelImpl(name);
+ result.setConfiguration(getConfiguration().copy());
+ return result;
+ }
+
+ /**
+ * @return the configuration
+ */
+ public BlazeGroupConfiguration getConfiguration() {
+ return (BlazeGroupConfiguration) super.getConfiguration();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,647 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.activeblaze.BlazeChannelImpl;
+import org.apache.activeblaze.BlazeMessage;
+import org.apache.activeblaze.BlazeRuntimeException;
+import org.apache.activeblaze.BlazeTopicListener;
+import org.apache.activeblaze.Processor;
+import org.apache.activeblaze.impl.destination.DestinationMatch;
+import org.apache.activeblaze.impl.processor.Packet;
+import org.apache.activeblaze.impl.transport.BaseTransport;
+import org.apache.activeblaze.impl.transport.TransportFactory;
+import org.apache.activeblaze.util.PropertyUtil;
+import org.apache.activeblaze.wire.BlazeData;
+import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activeblaze.wire.MemberData;
+import org.apache.activeblaze.wire.MessageType;
+import org.apache.activeblaze.wire.PacketData;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * <P>
+ * A <CODE>BlazeGroupChannel</CODE> enables peer-based point to point communication
+ *
+ */
+public class BlazeGroupChannelImpl extends BlazeChannelImpl implements BlazeGroupChannel {
+ private static final Log LOG = LogFactory.getLog(BlazeGroupChannelImpl.class);
+ private final String name;
+ private Processor unicast;
+ private BaseTransport groupManagementTransport;
+ private InetSocketAddress toManagementAddress;
+ private MemberImpl local;
+ private BlazeQueueListener inboxListener;
+ private Map<Buffer, SendRequest> messageRequests = new HashMap<Buffer, SendRequest>();
+ private Map<Buffer, BlazeQueueListener> queueMessageListenerMap = new ConcurrentHashMap<Buffer, BlazeQueueListener>();
+ private Group group;
+ private Buffer inboxURI;
+ private final Object localMutex = new Object();
+
+ /**
+ * Constructor
+ *
+ * @param name
+ */
+ protected BlazeGroupChannelImpl(String name) {
+ super();
+ this.name = name;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#init()
+ */
+ public boolean init() throws Exception {
+ boolean result = super.init();
+ if (result) {
+ String unicastURIStr = getConfiguration().getUnicastURI();
+ unicastURIStr=PropertyUtil.addPropertiesToURIFromBean(unicastURIStr, getConfiguration());
+ URI unicastURI = new URI(unicastURIStr);
+ this.inboxURI = new Buffer(unicastURIStr);
+ BaseTransport transport = TransportFactory.get(unicastURI);
+ transport.setName(getId() + "-Unicast");
+ this.unicast = configureProcess(transport);
+ this.unicast.init();
+ // if using a port of zero - the port will be assigned automatically,
+ // so need to get the potentially new value
+ unicastURI = transport.getLocalURI();
+ //append configuration properties
+
+ String groupManagementURIStr = getGroupConfiguration().getGroupManagementURI();
+ groupManagementURIStr=PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
+ URI groupManagementURI = new URI(groupManagementURIStr);
+ this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(), groupManagementURI.getPort());
+ this.groupManagementTransport = TransportFactory.get(groupManagementURI);
+ configureTransport(this.groupManagementTransport);
+ this.groupManagementTransport.setPrev(this);
+ this.groupManagementTransport.setName(getId() + "-HeartbeatTransport");
+ this.groupManagementTransport.init();
+ this.local = createLocal(unicastURI);
+ this.group = createGroup();
+ }
+ return result;
+ }
+
+ protected MemberImpl createLocal(URI uri) throws Exception {
+ return new MemberImpl(getId(), getName(), 0, uri);
+ }
+
+ protected Group createGroup() {
+ return new Group(this);
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#shutDown()
+ */
+ public boolean shutDown() throws Exception {
+ boolean result = super.shutDown();
+ if (result) {
+ this.group.shutDown();
+ this.groupManagementTransport.shutDown();
+ this.unicast.shutDown();
+ }
+ return result;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#start()
+ */
+ public boolean start() throws Exception {
+ boolean result = super.start();
+ if (result) {
+ this.groupManagementTransport.start();
+ this.unicast.start();
+ this.group.start();
+ }
+ return result;
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.Service#stop()
+ */
+ public boolean stop() throws Exception {
+ boolean result = super.stop();
+ if (result) {
+ this.group.stop();
+ this.groupManagementTransport.stop();
+ this.unicast.stop();
+ }
+ return result;
+ }
+
+ /**
+ * @return the name
+ */
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * @return the inboxListener
+ */
+ public BlazeQueueListener getInboxListener() {
+ return this.inboxListener;
+ }
+
+ /**
+ * @param inboxListener
+ * the inboxListener to set
+ */
+ public void setInboxListener(BlazeQueueListener inboxListener) {
+ this.inboxListener = inboxListener;
+ }
+
+ /**
+ * @return this channel's configuration
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#getGroupConfiguration()
+ */
+ public BlazeGroupConfiguration getGroupConfiguration() {
+ return (BlazeGroupConfiguration) getConfiguration();
+ }
+
+ /**
+ * @return the member for this channel
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#getLocalMember()
+ */
+ public final MemberImpl getLocalMember() {
+ synchronized (this.localMutex) {
+ return this.local;
+ }
+ }
+
+ /**
+ * @param l
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#addMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
+ */
+ public void addMemberChangedListener(MemberChangedListener l) {
+ this.group.addMemberChangedListener(l);
+ }
+
+ /**
+ * @param l
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#removeMemberChangedListener(org.apache.activeblaze.group.MemberChangedListener)
+ */
+ public void removeMemberChangedListener(MemberChangedListener l) {
+ this.group.removeMemberChangedListener(l);
+ }
+
+ /**
+ * @param id
+ * @return
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberById(java.lang.String)
+ */
+ public Member getMemberById(String id) {
+ return this.group.getMemberById(id);
+ }
+
+ /**
+ * @param name
+ * @return
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#getMemberByName(java.lang.String)
+ */
+ public Member getMemberByName(String name) {
+ return this.group.getMemberByName(name);
+ }
+
+ /**
+ * Will wait for a member to advertise itself if not available
+ *
+ * @param name
+ * @param timeout
+ * @return the member or null
+ * @throws InterruptedException
+ */
+ public Member getAndWaitForMemberByName(String name, int timeout) throws InterruptedException {
+ return this.group.getAndWaitForMemberByName(name, timeout);
+ }
+
+ /**
+ * @return
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#getMembers()
+ */
+ public Set<Member> getMembers() {
+ return this.group.getMembers();
+ }
+
+ /**
+ * Send a message to a member of the group - in a round-robin fashion
+ *
+ * @param destination
+ * @param message
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#send(java.lang.String, org.apache.activeblaze.BlazeMessage)
+ */
+ public void send(String destination, BlazeMessage message) throws Exception {
+ Buffer key = new Buffer(destination);
+ MemberImpl member = getQueueDestination(key);
+ if (member != null) {
+ send(member, key, message);
+ }
+ }
+
+ /**
+ * @param member
+ * @param destination
+ * @param message
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#send(org.apache.activeblaze.group.Member,
+ * org.apache.activeblaze.BlazeMessage)
+ */
+ public void send(Member member, BlazeMessage message) throws Exception {
+ send((MemberImpl) member, new Buffer(member.getInBoxDestination()), message);
+ }
+
+ /**
+ * @param member
+ * @param message
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
+ * org.apache.activeblaze.BlazeMessage)
+ */
+ public BlazeMessage sendRequest(Member member, BlazeMessage message) throws Exception {
+ return sendRequest(member, message, 0);
+ }
+
+ /**
+ * @param member
+ * @param message
+ * @param timeout
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(org.apache.activeblaze.group.Member,
+ * org.apache.activeblaze.BlazeMessage, int)
+ */
+ public BlazeMessage sendRequest(Member member, BlazeMessage message, int timeout) throws Exception {
+ return sendRequest((MemberImpl) member, new Buffer(member.getInBoxDestination()), message, timeout);
+ }
+
+ /**
+ * @param destination
+ * @param message
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
+ * org.apache.activeblaze.BlazeMessage)
+ */
+ public BlazeMessage sendRequest(String destination, BlazeMessage message) throws Exception {
+ Buffer key = new Buffer(destination);
+ MemberImpl member = getQueueDestination(key);
+ return sendRequest(member, key, message, 0);
+ }
+
+ /**
+ * @param destination
+ * @param message
+ * @param timeout
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#sendRequest(java.lang.String,
+ * org.apache.activeblaze.BlazeMessage, int)
+ */
+ public BlazeMessage sendRequest(String destination, BlazeMessage message, int timeout) throws Exception {
+ Buffer key = new Buffer(destination);
+ MemberImpl member = getQueueDestination(key);
+ return sendRequest(member, key, message, timeout);
+ }
+
+ protected synchronized BlazeMessage sendRequest(MemberImpl member, Buffer destination, BlazeMessage message,
+ int timeout) throws Exception {
+ BlazeMessage result = null;
+ if (member != null) {
+ SendRequest request = new SendRequest();
+ message.storeContent();
+ BlazeData blazeData = message.getContent();
+ blazeData.setTopic(false);
+ blazeData.setDestination(destination);
+ PacketData packetData = getPacketData(blazeData.type(), blazeData);
+ synchronized (this.messageRequests) {
+ this.messageRequests.put(packetData.getMessageId(), request);
+ }
+ packetData.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(packetData);
+ packet.setTo((member).getAddress());
+ this.unicast.downStream(packet);
+ packetData = (PacketData) request.get(timeout);
+ result = buildBlazeMessage(packetData);
+ }
+ return result;
+ }
+
+ /**
+ * @param to
+ * @param response
+ * @param correlationId
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#sendReply(org.apache.activeblaze.group.Member,
+ * org.apache.activeblaze.BlazeMessage, java.lang.String)
+ */
+ public synchronized void sendReply(Member to, BlazeMessage response, String correlationId) throws Exception {
+ response.storeContent();
+ BlazeData blazeData = response.getContent();
+ blazeData.setTopic(false);
+ PacketData data = getPacketData(blazeData.type(), blazeData);
+ data.setCorrelationId(new Buffer(correlationId));
+ data.setReliable(true);
+ data.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(data);
+ packet.setTo(((MemberImpl) to).getAddress());
+ this.unicast.downStream(packet);
+
+ }
+
+ protected void send(MemberImpl member, Buffer destination, BlazeMessage message) throws Exception {
+ message.storeContent();
+ BlazeData blazeData = message.getContent();
+ send(member, destination, blazeData);
+ }
+
+ protected synchronized void send(MemberImpl member, Buffer destination, BlazeData blazeData) throws Exception {
+ blazeData.setTopic(false);
+ blazeData.setDestination(destination);
+ PacketData data = getPacketData(MessageType.BLAZE_DATA, blazeData);
+ data.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(data);
+ packet.setTo(member.getAddress());
+ this.unicast.downStream(packet);
+ }
+
+ /**
+ * @param destination
+ * @param l
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#addBlazeQueueMessageListener(java.lang.String,
+ * org.apache.activeblaze.group.BlazeQueueListener)
+ */
+ public void addBlazeQueueMessageListener(String destination, BlazeQueueListener l) throws Exception {
+ init();
+ Buffer key = new Buffer(destination);
+ this.queueMessageListenerMap.put(key, l);
+ buildLocal();
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.group.BlazeGroupChannel#removeBlazeGroupMessageListener(java.lang.String)
+ */
+ public BlazeQueueListener removeBlazeQueueMessageListener(String destination) throws Exception {
+ init();
+ Buffer key = new Buffer(destination);
+ BlazeQueueListener result = this.queueMessageListenerMap.remove(key);
+ buildLocal();
+ return result;
+ }
+
+ /**
+ * @param destination
+ * @param l
+ * @throws Exception
+ * @see org.apache.activeblaze.BlazeChannel#addBlazeTopicMessageListener(java.lang.String,
+ * org.apache.activeblaze.BlazeTopicListener)
+ */
+ public void addBlazeTopicMessageListener(String destination, BlazeTopicListener l) throws Exception {
+ init();
+ super.addBlazeTopicMessageListener(destination, l);
+ buildLocal();
+ }
+
+ /**
+ * @param destination
+ * @param l
+ * @return
+ * @throws Exception
+ * @see org.apache.activeblaze.BlazeChannel#removeBlazeMessageListener(java.lang.String,
+ * org.apache.activeblaze.BlazeTopicListener)
+ */
+ public BlazeTopicListener removeBlazeTopicMessageListener(String destination) throws Exception {
+ init();
+ BlazeTopicListener result = super.removeBlazeTopicMessageListener(destination);
+ buildLocal();
+ return result;
+ }
+
+ protected void processData(String id, Buffer correlationId, PacketData data) throws Exception {
+ if (isStarted()) {
+ if (!processRequest(correlationId, data)) {
+ MessageType type = MessageType.valueOf(data.getType());
+ if (type == MessageType.BLAZE_DATA) {
+ doProcessBlazeData(data);
+ } else if (type == MessageType.MEMBER_DATA) {
+ doProcessMemberData(data);
+ }
+ }
+ }
+ }
+
+ boolean processRequest(Buffer correlationId, Message<?> value) {
+ boolean result = false;
+ if (correlationId != null) {
+ SendRequest request = null;
+ synchronized (this.messageRequests) {
+ request = this.messageRequests.remove(correlationId);
+ }
+ if (request != null) {
+ request.put(correlationId, value);
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ protected void doProcessBlazeData(PacketData data) throws Exception {
+ BlazeMessage message = (BlazeMessage) buildBlazeMessage(data);
+ if (message.getContent().getTopic()) {
+ super.processBlazeMessage(message);
+ } else {
+ Buffer destination = message.getContent().getDestination();
+ if (this.inboxListener != null && this.producerId.equals(destination)) {
+ this.inboxListener.onMessage(message);
+ } else {
+ for (Map.Entry<Buffer, BlazeQueueListener> entry : this.queueMessageListenerMap.entrySet()) {
+ if (DestinationMatch.isMatch(destination, entry.getKey())) {
+ entry.getValue().onMessage(message);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ protected Group getGroup() {
+ return this.group;
+ }
+
+ protected BlazeMessage createMessage(String fromId) {
+ Member member = this.group.getMemberById(fromId);
+ BlazeMessage message = new BlazeGroupMessage(member);
+ return message;
+ }
+
+ protected void doProcessMemberData(PacketData data) throws Exception {
+ MessageType type = MessageType.MEMBER_DATA;
+ MemberData memberData = (MemberData) type.createMessage();
+ Buffer payload = data.getPayload();
+ memberData.mergeFramed(payload);
+ this.group.processMember(memberData);
+ }
+
+ /**
+ * @param messageType
+ * @param message
+ * @throws Exception
+ */
+ public synchronized void broadcastMessage(MessageType messageType, Message<?> message) throws Exception {
+ PacketData data = getPacketData(messageType, message);
+ data.setReliable(false);
+ data.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(data);
+ packet.setTo(this.toManagementAddress);
+ this.groupManagementTransport.downStream(packet);
+ }
+
+ /**
+ * @param asyncRequest
+ * @param messageType
+ * @param message
+ * @throws Exception
+ */
+ public synchronized void broadcastMessage(AsyncGroupRequest asyncRequest, MessageType messageType,
+ Message<?> message) throws Exception {
+ SendRequest request = new SendRequest();
+ PacketData data = getPacketData(messageType, message);
+ asyncRequest.add(data.getMessageId(), request);
+ synchronized (this.messageRequests) {
+ this.messageRequests.put(data.getMessageId(), request);
+ }
+ data.setReliable(false);
+ data.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(data);
+ packet.setTo(this.toManagementAddress);
+ this.groupManagementTransport.downStream(packet);
+ }
+
+ /**
+ * @param to
+ * @param messageType
+ * @param message
+ * @throws Exception
+ */
+ public synchronized void sendMessage(InetSocketAddress to, MessageType messageType, Message<?> message)
+ throws Exception {
+ PacketData data = getPacketData(messageType, message);
+ data.setReliable(false);
+ data.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(data);
+ packet.setTo(to);
+ this.unicast.downStream(packet);
+ }
+
+ /**
+ * @param to
+ * @param messageType
+ * @param message
+ * @param correlationId
+ * @throws Exception
+ */
+ public synchronized void sendReply(MemberImpl to,MessageType messageType, Message<?> message, String correlationId)
+ throws Exception {
+ PacketData data = getPacketData(messageType, message);
+ data.setCorrelationId(new Buffer(correlationId));
+ data.setReliable(false);
+ data.setFromAddress(this.inboxURI);
+ Packet packet = new Packet(data);
+ packet.setTo(to.getAddress());
+ this.unicast.downStream(packet);
+ }
+
+ protected MemberImpl getQueueDestination(Buffer destination) {
+ // choose a member
+ MemberImpl result = null;
+ Map<Buffer, List<MemberImpl>> map = this.group.getQueueMap();
+ List<MemberImpl> list = map.get(destination);
+ if (list == null) {
+ // search through wildcard matches
+ for (Buffer buffer : map.keySet()) {
+ if (DestinationMatch.isMatch(destination, buffer)) {
+ list = map.get(destination);
+ break;
+ }
+ }
+ }
+ if (list != null && !list.isEmpty()) {
+ result = list.remove(0);
+ // round-robin
+ list.add(result);
+ }
+ return result;
+ }
+
+ protected void buildLocal() {
+ if (isInitialized()) {
+ try {
+ synchronized (this.localMutex) {
+ MemberImpl result = new MemberImpl(getLocalMember().getData().clone());
+ result.getData().clearDestination();
+ // add topic destinations
+ for (Buffer destination : this.topicessageListenerMap.keySet()) {
+ DestinationData data = new DestinationData();
+ data.setDestination(destination);
+ data.setTopic(true);
+ result.getData().addDestination(data);
+ }
+ // add Queue Destinations
+ for (Buffer destination : this.queueMessageListenerMap.keySet()) {
+ DestinationData data = new DestinationData();
+ data.setDestination(destination);
+ data.setTopic(false);
+ result.getData().addDestination(data);
+ }
+ this.group.processMemberUpdate(this.local, result);
+ result.getData().setDestinationsChanged(true);
+ this.group.broadcastHeartBeat(result);
+ result.getData().clearDestinationsChanged();
+ this.local = result;
+ this.group.updateLocal(this.local);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to update local member ", e);
+ }
+ } else {
+ throw new BlazeRuntimeException("Not Initialized");
+ }
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeConfiguration;
+
+/**
+ * Configuration for a BlazeGroupChannel
+ *
+ */
+public class BlazeGroupConfiguration extends BlazeConfiguration {
+ private String groupManagementURI = "mcast://224.2.2.2:8888";
+
+ private int heartBeatInterval = 1000;
+
+ /**
+ * @return the groupManagementUTI
+ */
+ public String getGroupManagementURI() {
+ return this.groupManagementURI;
+ }
+
+ /**
+ * @param groupManagementURI
+ * the groupManagementURI to set
+ */
+ public void setGroupManagementURI(String groupManagementURI) {
+ this.groupManagementURI = groupManagementURI;
+ }
+
+
+ /**
+ * @return the heartBeatInterval
+ */
+ public int getHeartBeatInterval() {
+ return this.heartBeatInterval;
+ }
+
+ /**
+ * @param heartBeatInterval the heartBeatInterval to set
+ */
+ public void setHeartBeatInterval(int heartBeatInterval) {
+ this.heartBeatInterval = heartBeatInterval;
+ }
+
+ protected BlazeConfiguration newInstance() {
+ return new BlazeGroupConfiguration();
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeException;
+import org.apache.activeblaze.BlazeMessage;
+
+/**
+ * Has information about the sender of the Message
+ * This type of message is created on receiver
+ *
+ */
+public class BlazeGroupMessage extends BlazeMessage {
+ private final Member sender;
+
+
+ /**
+ * Constructor
+ * @param sender
+ */
+ public BlazeGroupMessage(Member sender){
+ this.sender=sender;
+ }
+
+ public BlazeMessage copy() throws BlazeException{
+ BlazeMessage copy = new BlazeGroupMessage(this.sender);
+ copy(copy);
+ return copy;
+ }
+
+ /**
+ * @return the sender
+ */
+ public Member getSender() {
+ return this.sender;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.BlazeMessage;
+
+
+/**
+ * A listener for BlazeMessages
+ *
+ */
+public interface BlazeQueueListener {
+
+ /**
+ * Called when a Message is available to be processes
+ * @param message
+ */
+ public void onMessage(BlazeMessage message);
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeQueueListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java?rev=719706&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java Fri Nov 21 12:44:40 2008
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.group;
+
+import org.apache.activeblaze.wire.DestinationData;
+import org.apache.activemq.protobuf.Buffer;
+
+/**
+ * Represents a Destination
+ *
+ */
+class Destination {
+ private final boolean topic;
+ private final Buffer destination;
+ private final MemberImpl member;
+
+ Destination(MemberImpl member, DestinationData data) {
+ this.member = member;
+ this.topic = data.getTopic();
+ this.destination = data.getDestination();
+ }
+
+ /**
+ * @return the topic
+ */
+ public boolean isTopic() {
+ return this.topic;
+ }
+
+ /**
+ * @return the destination
+ */
+ public Buffer getDestination() {
+ return this.destination;
+ }
+
+ /**
+ * @return the member
+ */
+ public MemberImpl getMember() {
+ return this.member;
+ }
+
+ public int hashCode() {
+ return getMember().getId().hashCode() ^ getDestination().hashCode();
+ }
+
+ public boolean equals(Object object) {
+ boolean result = false;
+ if (object instanceof Destination) {
+ Destination other = (Destination) object;
+ result = other.getMember().getId().equals(this.member.getId())
+ && other.destination.equals(this.destination);
+ }
+ return result;
+ }
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Destination.java
------------------------------------------------------------------------------
svn:eol-style = native