You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/12/02 21:54:32 UTC

svn commit: r722615 - in /activemq/trunk/kahadb: ./ src/main/java/org/apache/kahadb/replication/blaze/ src/test/java/org/apache/kahadb/replication/blaze/

Author: rajdavies
Date: Tue Dec  2 12:54:32 2008
New Revision: 722615

URL: http://svn.apache.org/viewvc?rev=722615&view=rev
Log:
Add activeBlaze replication cluster state manager

Added:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java   (with props)
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java   (with props)
Modified:
    activemq/trunk/kahadb/pom.xml

Modified: activemq/trunk/kahadb/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/pom.xml?rev=722615&r1=722614&r2=722615&view=diff
==============================================================================
--- activemq/trunk/kahadb/pom.xml (original)
+++ activemq/trunk/kahadb/pom.xml Tue Dec  2 12:54:32 2008
@@ -105,7 +105,13 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-        
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activeblaze</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <optional>true</optional>
+    </dependency>
+     
   </dependencies>
   
   <repositories>

Added: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java?rev=722615&view=auto
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java (added)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java Tue Dec  2 12:54:32 2008
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannel;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannelFactory;
+import org.apache.activeblaze.cluster.BlazeClusterGroupConfiguration;
+import org.apache.activeblaze.cluster.MasterChangedListener;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+/**
+ * 
+ * @author rajdavies
+ * @org.apache.xbean.XBean element="blazeCluster"
+ */
+public class BlazeClusterStateManager extends BlazeClusterGroupConfiguration implements ClusterStateManager,
+        MemberChangedListener, MasterChangedListener {
+    private static final Log LOG = LogFactory.getLog(BlazeClusterStateManager.class);
+    private BlazeClusterGroupChannel channel;
+    private AtomicBoolean started = new AtomicBoolean();
+    private ClusterState clusterState;
+    private String localMemberName;
+    private PBClusterNodeStatus status;
+    final private List<ClusterListener> listeners = new CopyOnWriteArrayList<ClusterListener>();
+
+    /**
+     * @param listener
+     * @see org.apache.kahadb.replication.ClusterStateManager#addListener(org.apache.kahadb.replication.ClusterListener)
+     */
+    public void addListener(ClusterListener listener) {
+        this.listeners.add(listener);
+        initializeChannel();
+        fireClusterChange();
+    }
+
+    /**
+     * @param node
+     * @see org.apache.kahadb.replication.ClusterStateManager#addMember(java.lang.String)
+     */
+    public void addMember(String node) {
+        this.localMemberName = node;
+        initializeChannel();
+    }
+
+    /**
+     * @param listener
+     * @see org.apache.kahadb.replication.ClusterStateManager#removeListener(org.apache.kahadb.replication.ClusterListener)
+     */
+    public void removeListener(ClusterListener listener) {
+        this.listeners.remove(listener);
+    }
+
+    /**
+     * @param node
+     * @see org.apache.kahadb.replication.ClusterStateManager#removeMember(java.lang.String)
+     */
+    public void removeMember(String node) {
+    }
+
+    /**
+     * @param status
+     * @see org.apache.kahadb.replication.ClusterStateManager#setMemberStatus(org.apache.kahadb.replication.pb.PBClusterNodeStatus)
+     */
+    public void setMemberStatus(PBClusterNodeStatus status) {
+        if (status != null) {
+            this.status = status;
+            setMasterWeight(status.getState().getNumber());
+            if (this.channel != null) {
+                this.channel.getConfiguration().setMasterWeight(getMasterWeight());
+                try {
+                    this.channel.waitForElection(getAwaitGroupTimeout());
+                } catch (Exception e) {
+                    LOG.error("Wait for Election Failed");
+                }
+            }
+            processClusterStateChange();
+        }
+    }
+
+    /**
+     * @param arg0
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
+     */
+    public void memberStarted(Member arg0) {
+        processClusterStateChange();
+    }
+
+    /**
+     * @param arg0
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
+     */
+    public void memberStopped(Member arg0) {
+        processClusterStateChange();
+    }
+
+    /**
+     * @param arg0
+     * @see org.apache.activeblaze.cluster.MasterChangedListener#masterChanged(org.apache.activeblaze.group.Member)
+     */
+    public void masterChanged(Member arg0) {
+        processClusterStateChange();
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activemq.Service#start()
+     */
+    public void start() throws Exception {
+        if (this.started.compareAndSet(false, true)) {
+            initializeChannel();
+        }
+        this.started.set(true);
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activemq.Service#stop()
+     */
+    public void stop() throws Exception {
+        if (this.started.compareAndSet(true, false)) {
+            if (this.channel != null) {
+                this.channel.removeMemberChangedListener(this);
+                this.channel.removeMasterChangedListener(this);
+                this.channel.shutDown();
+                this.channel = null;
+            }
+        }
+        this.started.set(false);
+    }
+
+    private boolean isStarted() {
+        return this.started.get();
+    }
+
+    synchronized private void updateClusterState(ClusterState clusterState) {
+        this.clusterState = clusterState;
+        fireClusterChange();
+    }
+
+    private void fireClusterChange() {
+        if (isStarted() && !this.listeners.isEmpty() && this.clusterState != null) {
+            for (ClusterListener listener : this.listeners) {
+                listener.onClusterChange(this.clusterState);
+            }
+        }
+    }
+
+    private void processClusterStateChange() {
+        if (isStarted()) {
+            try {
+                ClusterState state = new ClusterState();
+                this.channel.waitForElection(getAwaitGroupTimeout());
+                Set<Member> members = this.channel.getMembers();
+                Member master = this.channel.getMaster();
+                if (master != null) {
+                    // check we can be the master
+                    if (!this.channel.isMaster() || (this.status != null)) {
+                        state.setMaster(master.getName());
+                        members.remove(master);
+                    }
+                }
+                List<String> slaves = new ArrayList<String>();
+                for (Member slave : members) {
+                    slaves.add(slave.getName());
+                }
+                state.setSlaves(slaves);
+                updateClusterState(state);
+            } catch (Exception e) {
+                LOG.error("Failed to process Cluster State Changed", e);
+            }
+        }
+    }
+
+    private synchronized void initializeChannel() {
+        if (this.localMemberName != null && this.channel == null) {
+            try {
+                BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory(this);
+                this.channel = factory.createChannel(this.localMemberName);
+                this.channel.addMemberChangedListener(this);
+                this.channel.addMasterChangedListener(this);
+                if (isStarted()) {
+                    this.channel.start();
+                    this.channel.waitForElection(getAwaitGroupTimeout());
+                }
+                processClusterStateChange();
+            } catch (Exception e) {
+                LOG.error("Failed to create channel", e);
+            }
+        }
+    }
+}

Propchange: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java?rev=722615&view=auto
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java (added)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java Tue Dec  2 12:54:32 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.apache.activemq.util.Callback;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+
+/**
+ * @author rajdavies
+ * 
+ */
+public class BlazeClusterStateManagerTest extends TestCase {
+    public void testTwoNodesGoingOnline() throws Exception {
+        final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+        final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
+        final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+        bcsm1.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents1.add(config);
+            }
+        });
+        bcsm1.start();
+        bcsm1.addMember("kdbr://localhost:60001");
+        final BlazeClusterStateManager bcsm2 = new BlazeClusterStateManager();
+        bcsm2.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents2.add(config);
+            }
+        });
+        bcsm2.start();
+        bcsm2.addMember("kdbr://localhost:60002");
+        // Drain the events..
+        while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+        }
+        while (stateEvents2.poll(100, TimeUnit.MILLISECONDS) != null) {
+        }
+        // Bring node 1 online
+        final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
+        status1.setConnectUri("kdbr://localhost:60001");
+        status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
+        status1.setState(State.SLAVE_UNCONNECTED);
+        executeAsync(new Callback() {
+            public void execute() throws Exception {
+                bcsm1.setMemberStatus(status1);
+            }
+        });
+        // Bring node 2 online
+        final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
+        status2.setConnectUri("kdbr://localhost:60002");
+        status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
+        status2.setState(State.SLAVE_UNCONNECTED);
+        executeAsync(new Callback() {
+            public void execute() throws Exception {
+                Thread.sleep(1000);
+                bcsm2.setMemberStatus(status2);
+            }
+        });
+        ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60002", state.getMaster());
+        assertTrue(state.getSlaves().size() == 1);
+        state = stateEvents2.poll(2, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60002", state.getMaster());
+        assertTrue(state.getSlaves().size() == 1);
+        bcsm2.stop();
+        bcsm1.stop();
+    }
+
+    public void testOneNodeGoingOnline() throws Exception {
+        final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+        final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+        bcsm1.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents1.add(config);
+            }
+        });
+        bcsm1.start();
+        // Drain the events..
+        while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+        }
+        // Let node1 join the cluster.
+        bcsm1.addMember("kdbr://localhost:60001");
+        ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNull(state.getMaster());
+        assertTrue(state.getSlaves().size() == 1);
+        // Let the cluster know that node1 is online..
+        PBClusterNodeStatus status = new PBClusterNodeStatus();
+        status.setConnectUri("kdbr://localhost:60001");
+        status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
+        status.setState(State.SLAVE_UNCONNECTED);
+        bcsm1.setMemberStatus(status);
+        state = stateEvents1.poll(10, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60001", state.getMaster());
+        assertTrue(state.getSlaves().isEmpty());
+        bcsm1.stop();
+    }
+
+    private void executeAsync(final Callback callback) {
+        new Thread("Async Test Task") {
+            @Override
+            public void run() {
+                try {
+                    callback.execute();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+    }
+}

Propchange: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native