You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/02 21:54:32 UTC
svn commit: r722615 - in /activemq/trunk/kahadb: ./
src/main/java/org/apache/kahadb/replication/blaze/
src/test/java/org/apache/kahadb/replication/blaze/
Author: rajdavies
Date: Tue Dec 2 12:54:32 2008
New Revision: 722615
URL: http://svn.apache.org/viewvc?rev=722615&view=rev
Log:
Add activeBlaze replication cluster state manager
Added:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java (with props)
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java (with props)
Modified:
activemq/trunk/kahadb/pom.xml
Modified: activemq/trunk/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/pom.xml?rev=722615&r1=722614&r2=722615&view=diff
==============================================================================
--- activemq/trunk/kahadb/pom.xml (original)
+++ activemq/trunk/kahadb/pom.xml Tue Dec 2 12:54:32 2008
@@ -105,7 +105,13 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activeblaze</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <optional>true</optional>
+ </dependency>
+
</dependencies>
<repositories>
Added: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java?rev=722615&view=auto
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java (added)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java Tue Dec 2 12:54:32 2008
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannel;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannelFactory;
+import org.apache.activeblaze.cluster.BlazeClusterGroupConfiguration;
+import org.apache.activeblaze.cluster.MasterChangedListener;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+/**
+ *
+ * @author rajdavies
+ * @org.apache.xbean.XBean element="blazeCluster"
+ */
+public class BlazeClusterStateManager extends BlazeClusterGroupConfiguration implements ClusterStateManager,
+ MemberChangedListener, MasterChangedListener {
+ private static final Log LOG = LogFactory.getLog(BlazeClusterStateManager.class);
+ private BlazeClusterGroupChannel channel;
+ private AtomicBoolean started = new AtomicBoolean();
+ private ClusterState clusterState;
+ private String localMemberName;
+ private PBClusterNodeStatus status;
+ final private List<ClusterListener> listeners = new CopyOnWriteArrayList<ClusterListener>();
+
+ /**
+ * @param listener
+ * @see org.apache.kahadb.replication.ClusterStateManager#addListener(org.apache.kahadb.replication.ClusterListener)
+ */
+ public void addListener(ClusterListener listener) {
+ this.listeners.add(listener);
+ initializeChannel();
+ fireClusterChange();
+ }
+
+ /**
+ * @param node
+ * @see org.apache.kahadb.replication.ClusterStateManager#addMember(java.lang.String)
+ */
+ public void addMember(String node) {
+ this.localMemberName = node;
+ initializeChannel();
+ }
+
+ /**
+ * @param listener
+ * @see org.apache.kahadb.replication.ClusterStateManager#removeListener(org.apache.kahadb.replication.ClusterListener)
+ */
+ public void removeListener(ClusterListener listener) {
+ this.listeners.remove(listener);
+ }
+
+ /**
+ * @param node
+ * @see org.apache.kahadb.replication.ClusterStateManager#removeMember(java.lang.String)
+ */
+ public void removeMember(String node) {
+ }
+
+ /**
+ * @param status
+ * @see org.apache.kahadb.replication.ClusterStateManager#setMemberStatus(org.apache.kahadb.replication.pb.PBClusterNodeStatus)
+ */
+ public void setMemberStatus(PBClusterNodeStatus status) {
+ if (status != null) {
+ this.status = status;
+ setMasterWeight(status.getState().getNumber());
+ if (this.channel != null) {
+ this.channel.getConfiguration().setMasterWeight(getMasterWeight());
+ try {
+ this.channel.waitForElection(getAwaitGroupTimeout());
+ } catch (Exception e) {
+ LOG.error("Wait for Election Failed");
+ }
+ }
+ processClusterStateChange();
+ }
+ }
+
+ /**
+ * @param arg0
+ * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
+ */
+ public void memberStarted(Member arg0) {
+ processClusterStateChange();
+ }
+
+ /**
+ * @param arg0
+ * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
+ */
+ public void memberStopped(Member arg0) {
+ processClusterStateChange();
+ }
+
+ /**
+ * @param arg0
+ * @see org.apache.activeblaze.cluster.MasterChangedListener#masterChanged(org.apache.activeblaze.group.Member)
+ */
+ public void masterChanged(Member arg0) {
+ processClusterStateChange();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.apache.activemq.Service#start()
+ */
+ public void start() throws Exception {
+ if (this.started.compareAndSet(false, true)) {
+ initializeChannel();
+ }
+ this.started.set(true);
+ }
+
+ /**
+ * @throws Exception
+ * @see org.apache.activemq.Service#stop()
+ */
+ public void stop() throws Exception {
+ if (this.started.compareAndSet(true, false)) {
+ if (this.channel != null) {
+ this.channel.removeMemberChangedListener(this);
+ this.channel.removeMasterChangedListener(this);
+ this.channel.shutDown();
+ this.channel = null;
+ }
+ }
+ this.started.set(false);
+ }
+
+ private boolean isStarted() {
+ return this.started.get();
+ }
+
+ synchronized private void updateClusterState(ClusterState clusterState) {
+ this.clusterState = clusterState;
+ fireClusterChange();
+ }
+
+ private void fireClusterChange() {
+ if (isStarted() && !this.listeners.isEmpty() && this.clusterState != null) {
+ for (ClusterListener listener : this.listeners) {
+ listener.onClusterChange(this.clusterState);
+ }
+ }
+ }
+
+ private void processClusterStateChange() {
+ if (isStarted()) {
+ try {
+ ClusterState state = new ClusterState();
+ this.channel.waitForElection(getAwaitGroupTimeout());
+ Set<Member> members = this.channel.getMembers();
+ Member master = this.channel.getMaster();
+ if (master != null) {
+ // check we can be the master
+ if (!this.channel.isMaster() || (this.status != null)) {
+ state.setMaster(master.getName());
+ members.remove(master);
+ }
+ }
+ List<String> slaves = new ArrayList<String>();
+ for (Member slave : members) {
+ slaves.add(slave.getName());
+ }
+ state.setSlaves(slaves);
+ updateClusterState(state);
+ } catch (Exception e) {
+ LOG.error("Failed to process Cluster State Changed", e);
+ }
+ }
+ }
+
+ private synchronized void initializeChannel() {
+ if (this.localMemberName != null && this.channel == null) {
+ try {
+ BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory(this);
+ this.channel = factory.createChannel(this.localMemberName);
+ this.channel.addMemberChangedListener(this);
+ this.channel.addMasterChangedListener(this);
+ if (isStarted()) {
+ this.channel.start();
+ this.channel.waitForElection(getAwaitGroupTimeout());
+ }
+ processClusterStateChange();
+ } catch (Exception e) {
+ LOG.error("Failed to create channel", e);
+ }
+ }
+ }
+}
Propchange: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java?rev=722615&view=auto
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java (added)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java Tue Dec 2 12:54:32 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.apache.activemq.util.Callback;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+
+/**
+ * @author rajdavies
+ *
+ */
+public class BlazeClusterStateManagerTest extends TestCase {
+ public void testTwoNodesGoingOnline() throws Exception {
+ final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+ final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
+ final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+ bcsm1.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents1.add(config);
+ }
+ });
+ bcsm1.start();
+ bcsm1.addMember("kdbr://localhost:60001");
+ final BlazeClusterStateManager bcsm2 = new BlazeClusterStateManager();
+ bcsm2.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents2.add(config);
+ }
+ });
+ bcsm2.start();
+ bcsm2.addMember("kdbr://localhost:60002");
+ // Drain the events..
+ while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+ }
+ while (stateEvents2.poll(100, TimeUnit.MILLISECONDS) != null) {
+ }
+ // Bring node 1 online
+ final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
+ status1.setConnectUri("kdbr://localhost:60001");
+ status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
+ status1.setState(State.SLAVE_UNCONNECTED);
+ executeAsync(new Callback() {
+ public void execute() throws Exception {
+ bcsm1.setMemberStatus(status1);
+ }
+ });
+ // Bring node 2 online
+ final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
+ status2.setConnectUri("kdbr://localhost:60002");
+ status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
+ status2.setState(State.SLAVE_UNCONNECTED);
+ executeAsync(new Callback() {
+ public void execute() throws Exception {
+ Thread.sleep(1000);
+ bcsm2.setMemberStatus(status2);
+ }
+ });
+ ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60002", state.getMaster());
+ assertTrue(state.getSlaves().size() == 1);
+ state = stateEvents2.poll(2, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60002", state.getMaster());
+ assertTrue(state.getSlaves().size() == 1);
+ bcsm2.stop();
+ bcsm1.stop();
+ }
+
+ public void testOneNodeGoingOnline() throws Exception {
+ final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+ final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+ bcsm1.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents1.add(config);
+ }
+ });
+ bcsm1.start();
+ // Drain the events..
+ while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+ }
+ // Let node1 join the cluster.
+ bcsm1.addMember("kdbr://localhost:60001");
+ ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNull(state.getMaster());
+ assertTrue(state.getSlaves().size() == 1);
+ // Let the cluster know that node1 is online..
+ PBClusterNodeStatus status = new PBClusterNodeStatus();
+ status.setConnectUri("kdbr://localhost:60001");
+ status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
+ status.setState(State.SLAVE_UNCONNECTED);
+ bcsm1.setMemberStatus(status);
+ state = stateEvents1.poll(10, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60001", state.getMaster());
+ assertTrue(state.getSlaves().isEmpty());
+ bcsm1.stop();
+ }
+
+ private void executeAsync(final Callback callback) {
+ new Thread("Async Test Task") {
+ @Override
+ public void run() {
+ try {
+ callback.execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+}
Propchange: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
------------------------------------------------------------------------------
svn:eol-style = native