You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/11/25 18:22:52 UTC
svn commit: r720544 - in /activemq/activemq-blaze/trunk/src:
main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/cluster/
main/java/org/apache/activeblaze/group/ main/proto/
test/java/org/apache/activeblaze/cluster/
Author: rajdavies
Date: Tue Nov 25 09:22:50 2008
New Revision: 720544
URL: http://svn.apache.org/viewvc?rev=720544&view=rev
Log:
rename coordinated package to cluster
Added:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java
- copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java
- copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java
- copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java
- copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java (with props)
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java
- copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java
- copied, changed from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java
- copied, changed from r720506, activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java
Removed:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatorChangedListener.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java
activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java
Modified:
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/BlazeChannelImpl.java Tue Nov 25 09:22:50 2008
@@ -51,7 +51,7 @@
protected AtomicLong sequence = new AtomicLong();
protected AtomicLong session = new AtomicLong(1);
private Processor broadcast;
- private BlazeConfiguration configuration = new BlazeConfiguration();
+ protected BlazeConfiguration configuration = new BlazeConfiguration();
private String id;
private Buffer managementURI;
private InetSocketAddress toAddress;
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java (from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannel.java Tue Nov 25 09:22:50 2008
@@ -23,38 +23,38 @@
* (elected leader) for the group
*
*/
-public interface BlazeCoordinatedGroupChannel extends BlazeGroupChannel{
+public interface BlazeClusterGroupChannel extends BlazeGroupChannel{
/**
* @return true if this Channel is the coordinator of the group
* @throws Exception
*/
- public boolean isCoordinator() throws Exception;
+ public boolean isMaster() throws Exception;
/**
* @return the member of the group which is the coordinator
* @throws Exception
*/
- public Member getCoordinator() throws Exception;
+ public Member getMaster() throws Exception;
/**
- * Add a listener for membership changes
+ * Add a listener for cluster changes
*
* @param l
* @throws Exception
*/
- public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+ public void addClusterChangedListener(ClusterChangedListener l) throws Exception;
/**
- * Remove a listener for membership changes
+ * Remove a listener for cluster changes
*
* @param l
* @throws Exception
*/
- public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception;
+ public void removeClusterChangedListener(ClusterChangedListener l) throws Exception;
/**
* @return the configuration
*/
- public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration();
+ public BlazeClusterGroupConfiguration getConfiguration();
/**
* waits for election in the group to finish
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java (from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelFactory.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelFactory.java Tue Nov 25 09:22:50 2008
@@ -23,20 +23,20 @@
/**
* Factory class for creating <Code>BlazeGroupChannel</CODE>
*/
-public class BlazeCoordinatedGroupChannelFactory extends BlazeGroupChannelFactory {
+public class BlazeClusterGroupChannelFactory extends BlazeGroupChannelFactory {
/**
* Default Constructor
*/
- public BlazeCoordinatedGroupChannelFactory() {
- super(new BlazeCoordinatedGroupConfiguration());
+ public BlazeClusterGroupChannelFactory() {
+ super(new BlazeClusterGroupConfiguration());
}
/**
* Construct a factory to use the passed Configuration
* @param config
*/
- public BlazeCoordinatedGroupChannelFactory(BlazeCoordinatedGroupConfiguration config){
+ public BlazeClusterGroupChannelFactory(BlazeClusterGroupConfiguration config){
super(config);
}
@@ -46,8 +46,8 @@
* @return the Channel
* @throws Exception
*/
- public BlazeCoordinatedGroupChannel createChannel(String name) throws Exception {
- BlazeCoordinatedGroupChannelImpl result = new BlazeCoordinatedGroupChannelImpl(name);
+ public BlazeClusterGroupChannel createChannel(String name) throws Exception {
+ BlazeClusterGroupChannelImpl result = new BlazeClusterGroupChannelImpl(name);
result.setConfiguration(getConfiguration().copy());
return result;
}
@@ -55,7 +55,7 @@
/**
* @return the configuration
*/
- public BlazeCoordinatedGroupConfiguration getConfiguration() {
- return (BlazeCoordinatedGroupConfiguration) super.getConfiguration();
+ public BlazeClusterGroupConfiguration getConfiguration() {
+ return (BlazeClusterGroupConfiguration) super.getConfiguration();
}
}
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java (from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelImpl.java Tue Nov 25 09:22:50 2008
@@ -33,65 +33,65 @@
* (elected leader) for the group
*
*/
-public class BlazeCoordinatedGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeCoordinatedGroupChannel {
- private static final Log LOG = LogFactory.getLog(BlazeCoordinatedGroupChannelImpl.class);
- private CoordinatedGroup coordinatedGroup;
+public class BlazeClusterGroupChannelImpl extends BlazeGroupChannelImpl implements BlazeClusterGroupChannel {
+ private static final Log LOG = LogFactory.getLog(BlazeClusterGroupChannelImpl.class);
+ private ClusterGroup coordinatedGroup;
/**
* Constructor
*
* @param name
*/
- public BlazeCoordinatedGroupChannelImpl(String name) {
+ public BlazeClusterGroupChannelImpl(String name) {
super(name);
}
/**
* @param l
* @throws Exception
- * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#addCoordinatorChangedListener(org.apache.activeblaze.cluster.CoordinatorChangedListener)
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#addClusterChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
*/
- public void addCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception {
+ public void addClusterChangedListener(ClusterChangedListener l) throws Exception {
init();
- this.coordinatedGroup.addCoordinatorChangedListener(l);
+ this.coordinatedGroup.addClusterChangedListener(l);
}
/**
* @return
* @throws Exception
- * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#getCoordinator()
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getMaster()
*/
- public Member getCoordinator() throws Exception {
+ public Member getMaster() throws Exception {
init();
- return this.coordinatedGroup.getCoordinator();
+ return this.coordinatedGroup.getMaster();
}
/**
* @return
* @throws Exception
- * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#isCoordinator()
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#isMaster()
*/
- public boolean isCoordinator() throws Exception {
+ public boolean isMaster() throws Exception {
init();
- return this.coordinatedGroup.isCoordinatorMatch();
+ return this.coordinatedGroup.isMasterMatch();
}
/**
* @param l
* @throws Exception
- * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.CoordinatorChangedListener)
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#removeMemberChangedListener(org.apache.activeblaze.cluster.ClusterChangedListener)
*/
- public void removeCoordinatorChangedListener(CoordinatorChangedListener l) throws Exception {
+ public void removeClusterChangedListener(ClusterChangedListener l) throws Exception {
init();
- this.coordinatedGroup.removeCoordinatorChangedListener(l);
+ this.coordinatedGroup.removeClusterChangedListener(l);
}
/**
* @return
- * @see org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel#getCoordinatedGroupConfiguration()
+ * @see org.apache.activeblaze.cluster.BlazeClusterGroupChannel#getCoordinatedGroupConfiguration()
*/
- public BlazeCoordinatedGroupConfiguration getCoordinatedGroupConfiguration() {
- return (BlazeCoordinatedGroupConfiguration) getGroupConfiguration();
+ public BlazeClusterGroupConfiguration getConfiguration() {
+ return (BlazeClusterGroupConfiguration) this.configuration;
}
/**
@@ -121,11 +121,11 @@
}
protected MemberImpl createLocal(URI uri) throws Exception {
- return new MemberImpl(getId(), getName(), getCoordinatedGroupConfiguration().getCoordinatorWeight(), uri);
+ return new MemberImpl(getId(), getName(), getConfiguration().getMasterWeight(), uri);
}
protected Group createGroup() {
- this.coordinatedGroup = new CoordinatedGroup(this);
+ this.coordinatedGroup = new ClusterGroup(this);
return this.coordinatedGroup;
}
@@ -134,7 +134,7 @@
ElectionMessage electionMessage = (ElectionMessage) type.createMessage();
Buffer payload = data.getPayload();
electionMessage.mergeFramed(payload);
- CoordinatedGroup group = (CoordinatedGroup) getGroup();
+ ClusterGroup group = (ClusterGroup) getGroup();
group.processElectionMessage(electionMessage, id);
}
}
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java (from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/BlazeClusterGroupConfiguration.java Tue Nov 25 09:22:50 2008
@@ -23,8 +23,8 @@
* Configuration for a BlazeCoordinatedGroupChannel
*
*/
-public class BlazeCoordinatedGroupConfiguration extends BlazeGroupConfiguration{
- private long coordinatorWeight = 0;
+public class BlazeClusterGroupConfiguration extends BlazeGroupConfiguration{
+ private long masterWeight = 0;
private int minimumGroupSize = 1;
private long awaitGroupTimeout = getHeartBeatInterval()*2;
@@ -32,15 +32,15 @@
/**
* @return the coordinatorWeight
*/
- public long getCoordinatorWeight() {
- return this.coordinatorWeight;
+ public long getMasterWeight() {
+ return this.masterWeight;
}
/**
* @param coordinatorWeight the coordinatorWeight to set
*/
- public void setCoordinatorWeight(long coordinatorWeight) {
- this.coordinatorWeight = coordinatorWeight;
+ public void setMasterWeight(long coordinatorWeight) {
+ this.masterWeight = coordinatorWeight;
}
/**
@@ -72,6 +72,6 @@
}
protected BlazeConfiguration newInstance() {
- return new BlazeCoordinatedGroupConfiguration();
+ return new BlazeClusterGroupConfiguration();
}
}
Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java?rev=720544&view=auto
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java (added)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java Tue Nov 25 09:22:50 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activeblaze.cluster;
+
+import org.apache.activeblaze.group.Member;
+
+/**
+ * A listener for coordinator changes to a group
+ *
+ */
+public interface ClusterChangedListener {
+ /**
+ * Fired when a master changes in the group
+ * @param master the new master of the cluster
+ */
+ void ClusterChanged(Member master);
+}
Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterChangedListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java (from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/CoordinatedGroup.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterGroup.java Tue Nov 25 09:22:50 2008
@@ -41,13 +41,13 @@
* Implementation of Group State
*
*/
-public class CoordinatedGroup extends Group {
- static final Log LOG = LogFactory.getLog(CoordinatedGroup.class);
- final BlazeCoordinatedGroupChannelImpl channel;
- private final BlazeCoordinatedGroupConfiguration configuration;
+public class ClusterGroup extends Group {
+ static final Log LOG = LogFactory.getLog(ClusterGroup.class);
+ final BlazeClusterGroupChannelImpl channel;
+ private final BlazeClusterGroupConfiguration configuration;
private ThreadPoolExecutor electionExecutor;
- private MemberImpl coordinator;
- private List<CoordinatorChangedListener> listeners = new CopyOnWriteArrayList<CoordinatorChangedListener>();
+ private MemberImpl master;
+ private List<ClusterChangedListener> listeners = new CopyOnWriteArrayList<ClusterChangedListener>();
final AtomicBoolean electionFinished = new AtomicBoolean(false);
private long startTime;
@@ -59,11 +59,11 @@
* @param transport
* @param config
*/
- protected CoordinatedGroup(BlazeCoordinatedGroupChannelImpl channel) {
+ protected ClusterGroup(BlazeClusterGroupChannelImpl channel) {
super(channel);
this.channel = channel;
- this.coordinator = this.channel.getLocalMember();
- this.configuration = channel.getCoordinatedGroupConfiguration();
+ this.master = this.channel.getLocalMember();
+ this.configuration = channel.getConfiguration();
}
/**
@@ -78,8 +78,7 @@
this.electionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "Election{" + CoordinatedGroup.this.channel.getId()
- + "}");
+ Thread thread = new Thread(runnable, "Election{" + ClusterGroup.this.channel.getId() + "}");
thread.setDaemon(true);
return thread;
}
@@ -161,20 +160,24 @@
/**
* @return true if the coordinator for the map
*/
- protected boolean isCoordinatorMatch() {
- String coordinatorId = this.coordinator != null ? this.coordinator.getId() : "";
- return this.channel.getId().equals(coordinatorId);
+ protected boolean isMasterMatch() {
+ String masterId = this.master != null ? this.master.getId() : "";
+ return this.channel.getId().equals(masterId);
}
- protected MemberImpl getCoordinator() {
- return this.coordinator;
+ protected MemberImpl getMaster() {
+ return this.master;
}
- protected void setCoordinator(MemberImpl member) {
- this.coordinator = member;
+ protected void setMaster(MemberImpl member) {
+ MemberImpl oldMaster = this.master;
+ this.master = member;
+ if (oldMaster == null || (oldMaster != null && this.master != null && !this.master.equals(oldMaster))) {
+ fireClusterChanged(this.master);
+ }
}
- protected void addCoordinatorChangedListener(CoordinatorChangedListener l) {
+ protected void addClusterChangedListener(ClusterChangedListener l) {
this.listeners.add(l);
}
@@ -184,19 +187,19 @@
* @param l
* @throws Exception
*/
- protected void removeCoordinatorChangedListener(CoordinatorChangedListener l) {
+ protected void removeClusterChangedListener(ClusterChangedListener l) {
this.listeners.remove(l);
}
- protected void fireCoordinatorChanged(MemberImpl newCoordinator) {
- for (CoordinatorChangedListener l : this.listeners) {
- l.coordinatorChanged(newCoordinator);
+ protected void fireClusterChanged(MemberImpl newMaster) {
+ for (ClusterChangedListener l : this.listeners) {
+ l.ClusterChanged(newMaster);
}
}
boolean callElection() throws Exception {
List<MemberImpl> members = new ArrayList<MemberImpl>(this.members.values());
- List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(members);
+ List<MemberImpl> sorted = ClusterGroup.sortMemberList(members);
AsyncGroupRequest request = new AsyncGroupRequest();
boolean doCall = false;
for (Member member : sorted) {
@@ -216,16 +219,17 @@
void processElectionMessage(ElectionMessage msg, String correlationId) throws Exception {
MemberImpl from = new MemberImpl(msg.getMember());
if (from != null && !from.getId().equals(getLocalMember().getId())) {
- LOG.debug(getLocalMember()+" Election message "+ msg.getElectionType() + " from " + from);
+ LOG.debug(getLocalMember() + " Election message " + msg.getElectionType() + " from " + from);
if (msg.getElectionType().equals(ElectionType.ELECTION)) {
ElectionMessage reply = new ElectionMessage();
reply.setElectionType(ElectionType.ANSWER);
reply.setMember(this.channel.getLocalMember().getData());
this.channel.sendReply(from, msg.type(), reply, correlationId);
election(null, false);
- } else if (msg.getElectionType().equals(ElectionType.COORDINATOR)) {
- this.coordinator=from;
- LOG.debug(getLocalMember()+" Coordinator is "+ from);
+ } else if (msg.getElectionType().equals(ElectionType.MASTER)) {
+ setMaster(from);
+ LOG.debug(getLocalMember() + " Master is " + from);
+ setMaster(from);
setElectionFinished(true);
}
}
@@ -262,7 +266,6 @@
return !isStopped() && this.electionFinished.get();
}
-
protected static List<MemberImpl> sortMemberList(List<MemberImpl> list) {
Collections.sort(list, new Comparator<Member>() {
public int compare(Member m1, Member m2) {
@@ -279,7 +282,7 @@
/**
* @return the configuration
*/
- public BlazeCoordinatedGroupConfiguration getConfiguration() {
+ public BlazeClusterGroupConfiguration getConfiguration() {
return this.configuration;
}
}
Copied: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java (from r720506, activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java?p2=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java&p1=activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/GroupState.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ClusterState.java Tue Nov 25 09:22:50 2008
@@ -61,10 +61,10 @@
*
*/
-public class GroupState<String,V> implements Map<String,V>{
+public class ClusterState<String,V> implements Map<String,V>{
- private final BlazeCoordinatedGroupChannelImpl channel;
- protected GroupState(BlazeCoordinatedGroupChannelImpl channel) {
+ private final BlazeClusterGroupChannelImpl channel;
+ protected ClusterState(BlazeClusterGroupChannelImpl channel) {
this.channel=channel;
}
/**
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/cluster/ElectionService.java Tue Nov 25 09:22:50 2008
@@ -17,8 +17,6 @@
package org.apache.activeblaze.cluster;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import org.apache.activeblaze.BaseService;
import org.apache.activeblaze.group.Member;
@@ -34,9 +32,9 @@
*/
class ElectionService extends BaseService implements Runnable {
private static final Log LOG = LogFactory.getLog(ElectionService.class);
- private final CoordinatedGroup group;
+ private final ClusterGroup group;
private Member member;
- ElectionService(CoordinatedGroup group,Member member, boolean memberStarted) {
+ ElectionService(ClusterGroup group,Member member, boolean memberStarted) {
this.group=group;
this.member = member;
}
@@ -59,9 +57,9 @@
;
if (this.group.isStarted() && isStarted()) {
- this.group.setCoordinator(selectCordinator(members));
- if (this.group.isCoordinatorMatch()) {
- this.group.broadcastElectionType(ElectionType.COORDINATOR);
+ this.group.setMaster(selectCordinator(members));
+ if (this.group.isMasterMatch()) {
+ this.group.broadcastElectionType(ElectionType.MASTER);
}
if (!this.group.isElectionFinished() && isStarted()) {
//ok - lets just wait for more members to show
@@ -75,17 +73,17 @@
}
if (!this.group.isElectionFinished() && isStarted()) {
// we must be the coordinator
- this.group.setCoordinator(this.group.getLocalMember());
+ this.group.setMaster(this.group.getLocalMember());
this.group.setElectionFinished(true);
LOG.debug(this.group.getLocalMember()+" We are the Coordinator ");
- this.group.broadcastElectionType(ElectionType.COORDINATOR);
+ this.group.broadcastElectionType(ElectionType.MASTER);
}
}
}
}
protected MemberImpl selectCordinator(List<MemberImpl> list) throws Exception {
- List<MemberImpl> sorted = CoordinatedGroup.sortMemberList(list);
+ List<MemberImpl> sorted = ClusterGroup.sortMemberList(list);
MemberImpl result = sorted.isEmpty() ? this.group.getLocalMember() : sorted
.get(list.size() - 1);
return result;
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannel.java Tue Nov 25 09:22:50 2008
@@ -122,7 +122,7 @@
/**
* @return the configuration
*/
- public BlazeGroupConfiguration getGroupConfiguration();
+ public BlazeGroupConfiguration getConfiguration();
/**
* @return a set of the members
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupChannelImpl.java Tue Nov 25 09:22:50 2008
@@ -96,7 +96,7 @@
// so need to get the potentially new value
unicastURI = transport.getLocalURI();
// append configuration properties
- String groupManagementURIStr = getGroupConfiguration().getGroupManagementURI();
+ String groupManagementURIStr = getConfiguration().getGroupManagementURI();
groupManagementURIStr = PropertyUtil.addPropertiesToURIFromBean(groupManagementURIStr, getConfiguration());
URI groupManagementURI = new URI(groupManagementURIStr);
this.toManagementAddress = new InetSocketAddress(groupManagementURI.getHost(), groupManagementURI.getPort());
@@ -190,8 +190,8 @@
* @return this channel's configuration
* @see org.apache.activeblaze.group.BlazeGroupChannel#getGroupConfiguration()
*/
- public BlazeGroupConfiguration getGroupConfiguration() {
- return (BlazeGroupConfiguration) getConfiguration();
+ public BlazeGroupConfiguration getConfiguration() {
+ return (BlazeGroupConfiguration) this.configuration;
}
/**
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/BlazeGroupConfiguration.java Tue Nov 25 09:22:50 2008
@@ -25,7 +25,7 @@
public class BlazeGroupConfiguration extends BlazeConfiguration {
private String groupManagementURI = "mcast://224.2.2.2:8888";
- private int heartBeatInterval = 1000;
+ private int heartBeatInterval = 250;
/**
* @return the groupManagementUTI
Modified: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java (original)
+++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/group/Group.java Tue Nov 25 09:22:50 2008
@@ -60,14 +60,14 @@
*/
protected Group(BlazeGroupChannelImpl channel) {
this.channel = channel;
- this.configuration = channel.getGroupConfiguration();
+ this.configuration = channel.getConfiguration();
}
/**
* @return the Member of the Channel
* @throws Exception
*/
- public MemberImpl getLocalMember() throws Exception {
+ public MemberImpl getLocalMember(){
return this.channel.getLocalMember();
}
@@ -268,6 +268,10 @@
}
return result;
}
+
+ public String toString() {
+ return "Group "+getLocalMember().getName();
+ }
/**
* Process a new member
Modified: activemq/activemq-blaze/trunk/src/main/proto/blaze.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/proto/blaze.proto?rev=720544&r1=720543&r2=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/main/proto/blaze.proto (original)
+++ activemq/activemq-blaze/trunk/src/main/proto/blaze.proto Tue Nov 25 09:22:50 2008
@@ -78,7 +78,7 @@
enum ElectionType {
ELECTION = 0;
ANSWER = 1;
- COORDINATOR = 2;
+ MASTER = 2;
}
message ElectionMessage {
//| option java_implments = "org.apache.activeblaze.impl.processor.PacketMessageType";
Copied: activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java (from r720506, activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java?p2=activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java&p1=activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java&r1=720506&r2=720544&rev=720544&view=diff
==============================================================================
--- activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeCoordinatedGroupChannelTest.java (original)
+++ activemq/activemq-blaze/trunk/src/test/java/org/apache/activeblaze/cluster/BlazeClusterGroupChannelTest.java Tue Nov 25 09:22:50 2008
@@ -18,86 +18,118 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannel;
-import org.apache.activeblaze.cluster.BlazeCoordinatedGroupChannelFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannel;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannelFactory;
+import org.apache.activeblaze.group.Member;
import junit.framework.TestCase;
/**
- * Test for coordinated channel
+ * Test for clustered channel
*
*/
-public class BlazeCoordinatedGroupChannelTest extends TestCase {
+public class BlazeClusterGroupChannelTest extends TestCase {
+
public void testGroup() throws Exception {
final int number = 3;
- List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
- BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
+ List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+ BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
for (int i = 0; i < number; i++) {
- BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
- channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+ BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
+ channel.getConfiguration().setMinimumGroupSize(number);
channel.start();
channels.add(channel);
}
channels.get(number - 1).waitForElection(5000);
- int coordinatorNumber = 0;
- BlazeCoordinatedGroupChannel coordinator = null;
- for (BlazeCoordinatedGroupChannel channel : channels) {
- if (channel.isCoordinator()) {
- coordinatorNumber++;
- coordinator = channel;
- }
- }
- assertNotNull(coordinator);
- assertEquals(1, coordinatorNumber);
- // kill the coordinator
- coordinator.shutDown();
- Thread.sleep(factory.getConfiguration().getHeartBeatInterval() * 2);
- coordinatorNumber = 0;
- coordinator = null;
- for (BlazeCoordinatedGroupChannel channel : channels) {
- if (channel.isCoordinator()) {
- coordinatorNumber++;
- coordinator = channel;
- }
- }
- assertNotNull(coordinator);
- assertEquals(1, coordinatorNumber);
- for (BlazeCoordinatedGroupChannel channel : channels) {
+ int masterNumber = 0;
+ BlazeClusterGroupChannel master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
+ }
+ }
+ assertNotNull(master);
+ assertEquals(1, masterNumber);
+ // kill the master
+ master.shutDown();
+ Thread.sleep(1000);
+ masterNumber = 0;
+ master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
+ }
+ }
+ assertNotNull(master);
+ assertEquals(1, masterNumber);
+ for (BlazeClusterGroupChannel channel : channels) {
channel.shutDown();
}
}
public void testWeightedGroup() throws Exception {
final int number = 4;
- List<BlazeCoordinatedGroupChannel> channels = new ArrayList<BlazeCoordinatedGroupChannel>();
- BlazeCoordinatedGroupChannelFactory factory = new BlazeCoordinatedGroupChannelFactory();
- BlazeCoordinatedGroupChannel weightedCoordinator = null;
+ List<BlazeClusterGroupChannel> channels = new ArrayList<BlazeClusterGroupChannel>();
+ BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+ BlazeClusterGroupChannel weightedMaster = null;
for (int i = 0; i < number; i++) {
- BlazeCoordinatedGroupChannel channel = factory.createChannel("test" + i);
- channel.getCoordinatedGroupConfiguration().setMinimumGroupSize(number);
+ BlazeClusterGroupChannel channel = factory.createChannel("test" + i);
+ channel.getConfiguration().setMinimumGroupSize(number);
if (i == number / 2) {
- channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(10);
- weightedCoordinator=channel;
- }else {
- channel.getCoordinatedGroupConfiguration().setCoordinatorWeight(0);
- }
- channel.start();
- channels.add(channel);
-
+ channel.getConfiguration().setMasterWeight(10);
+ weightedMaster = channel;
+ } else {
+ channel.getConfiguration().setMasterWeight(0);
+ }
+ channel.start();
+ channels.add(channel);
}
channels.get(number - 1).waitForElection(5000);
- int coordinatorNumber = 0;
- BlazeCoordinatedGroupChannel coordinator = null;
- for (BlazeCoordinatedGroupChannel channel : channels) {
- if (channel.isCoordinator()) {
- coordinatorNumber++;
- coordinator = channel;
+ int masterNumber = 0;
+ BlazeClusterGroupChannel master = null;
+ for (BlazeClusterGroupChannel channel : channels) {
+ if (channel.isMaster()) {
+ masterNumber++;
+ master = channel;
}
}
- assertNotNull(coordinator);
- assertTrue(coordinator==weightedCoordinator);
- assertEquals(1, coordinatorNumber);
- for (BlazeCoordinatedGroupChannel channel : channels) {
+ assertNotNull(master);
+ assertTrue(master == weightedMaster);
+ assertEquals(1, masterNumber);
+ for (BlazeClusterGroupChannel channel : channels) {
channel.shutDown();
}
}
+
+ public void testClusterChangedListener() throws Exception {
+ final AtomicBoolean result = new AtomicBoolean();
+ BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory();
+ BlazeClusterGroupChannel master = factory.createChannel("master");
+ master.getConfiguration().setMasterWeight(10);
+ master.start();
+
+ BlazeClusterGroupChannel channel = factory.createChannel("test1");
+ channel.addClusterChangedListener(new ClusterChangedListener() {
+ public void ClusterChanged(Member master) {
+ synchronized(result) {
+ result.set(true);
+ result.notifyAll();
+ }
+
+ }
+ });
+ channel.start();
+
+ synchronized(result) {
+ if (!result.get()) {
+ result.wait(3000);
+ }
+ }
+ assertTrue(result.get());
+ channel.shutDown();
+ master.shutDown();
+
+ }
}