You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/18 01:53:32 UTC

svn commit: r718453 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/replication/ main/java/org/apache/kahadb/replication/zk/ main/proto/ test/java/org/apache/kahadb/replication/zk/ test/resources/

Author: chirino
Date: Mon Nov 17 16:53:32 2008
New Revision: 718453

URL: http://svn.apache.org/viewvc?rev=718453&view=rev
Log:
Adding an initial pass of a ZooKeeper based cluster manager.

Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
    activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
    activemq/sandbox/kahadb/src/test/resources/log4j.properties

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Mon Nov 17 16:53:32 2008
@@ -18,7 +18,6 @@
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -27,8 +26,6 @@
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.zip.Adler32;
-import java.util.zip.Checksum;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.transport.Transport;
@@ -119,7 +116,7 @@
 		ReplicationFrame frame = new ReplicationFrame();
 		frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
 		PBJournalUpdate payload = new PBJournalUpdate();
-		payload.setLocation(convert(location));
+		payload.setLocation(ReplicationSupport.convert(location));
 		payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
 		frame.setPayload(payload);
 
@@ -285,7 +282,7 @@
 					info.setSnapshotId(snapshotId);
 					info.setStart(0);
 					info.setEnd(file.length());
-					info.setChecksum(copyAndChecksum(file, snapshotFile));
+					info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
 					snapshotInfos.add(info);
 					
 					rcPayload.setCopyFilesList(snapshotInfos);
@@ -345,43 +342,6 @@
 		}
 	}
 	
-	private PBJournalLocation convert(Location loc) {
-		if( loc==null ) {
-			return null;
-		}
-		return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
-	}
-
-	private long copyAndChecksum(File input, File output) throws IOException {
-		FileInputStream is = null;
-		FileOutputStream os = null;
-		try {
-			is = new FileInputStream(input);
-			os = new FileOutputStream(output);
-
-			byte buffer[] = new byte[1024 * 4];
-			int c;
-
-			Checksum checksum = new Adler32();
-			while ((c = is.read(buffer)) >= 0) {
-				os.write(buffer, 0, c);
-				checksum.update(buffer, 0, c);
-			}
-			return checksum.getValue();
-
-		} finally {
-			try {
-				is.close();
-			} catch(Throwable e) {
-			}
-			try {
-				os.close();
-			} catch(Throwable e) {
-			}
-		}
-	}
-
-
 
 	private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation journalLocation) {
 	}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Mon Nov 17 16:53:32 2008
@@ -40,7 +40,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.DataFile;
-import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.replication.pb.PBFileInfo;
 import org.apache.kahadb.replication.pb.PBHeader;
@@ -293,18 +292,12 @@
 		if( onlineRecovery ) {
 			KahaDBStore store = replicationServer.getStore();
 			// Let the journal know that we appended to one of it's files..
-			store.getJournal().appendedExternally(convert(location), data.length);
+			store.getJournal().appendedExternally(ReplicationSupport.convert(location), data.length);
 			// Now incrementally recover those records.
 			store.incrementalRecover();
 		}
 	}
 	
-	private Location convert(PBJournalLocation location) {
-		Location rc = new Location();
-		rc.setDataFileId(location.getFileId());
-		rc.setOffset(location.getOffset());
-		return rc;
-	}
 	
 	private void commitBulkTransfer() {
 		try {

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java?rev=718453&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java Mon Nov 17 16:53:32 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+
+public class ReplicationSupport {
+    
+    static public PBJournalLocation convert(Location loc) {
+        if( loc==null ) {
+            return null;
+        }
+        return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
+    }
+    
+    static public Location convert(PBJournalLocation location) {
+        Location rc = new Location();
+        rc.setDataFileId(location.getFileId());
+        rc.setOffset(location.getOffset());
+        return rc;
+    }
+
+
+    static public long copyAndChecksum(File input, File output) throws IOException {
+        FileInputStream is = null;
+        FileOutputStream os = null;
+        try {
+            is = new FileInputStream(input);
+            os = new FileOutputStream(output);
+
+            byte buffer[] = new byte[1024 * 4];
+            int c;
+
+            Checksum checksum = new Adler32();
+            while ((c = is.read(buffer)) >= 0) {
+                os.write(buffer, 0, c);
+                checksum.update(buffer, 0, c);
+            }
+            return checksum.getValue();
+
+        } finally {
+            try {
+                is.close();
+            } catch(Throwable e) {
+            }
+            try {
+                os.close();
+            } catch(Throwable e) {
+            }
+        }
+    }
+
+
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=718453&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java Mon Nov 17 16:53:32 2008
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.zk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.ReplicationSupport;
+import org.apache.kahadb.replication.pb.PBClusterConfiguration;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
+    private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);
+
+    final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+    private int startCounter;
+
+    private String uri = "zk://localhost:2181/activemq/ha-cluster/default";
+    String userid = "activemq";
+    String password = "";
+    
+    private ZooKeeper zk;
+    private String path;
+
+    ClusterState clusterState;
+    private String statusPath;
+    private PBClusterNodeStatus memberStatus;
+
+    private Thread takoverTask;
+
+    private boolean areWeTheBestMaster;
+
+    synchronized public void addListener(ClusterListener listener) {
+        listeners.add(listener);
+        fireClusterChange();
+    }
+
+    synchronized public void removeListener(ClusterListener listener) {
+        listeners.remove(listener);
+    }
+
+    synchronized private void updateClusterState(ClusterState clusterState) {
+        this.clusterState = clusterState;
+        fireClusterChange();
+    }
+
+    synchronized private void fireClusterChange() {
+        if (startCounter > 0 && !listeners.isEmpty()) {
+            for (ClusterListener listener : listeners) {
+                listener.onClusterChange(clusterState);
+            }
+        }
+    }
+
+    synchronized public void start() throws Exception {
+        startCounter++;
+        if (startCounter == 1) {
+
+            // Make sure the path is set..
+            String path = getPath();
+
+            // Create a ZooKeeper connection..
+            zk = createZooKeeperConnection();
+
+            mkParentDirs(path);
+            try {
+                zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (NodeExistsException ignore) {
+            }
+            
+            processClusterStateChange();
+        }
+    }
+
+    public String getPath() {
+        if( path == null ) {
+            try {
+                URI uri = new URI(this.uri);
+                path = uri.getPath();
+                if (path == null) {
+                    throw new IllegalArgumentException("Invalid uri '" + uri + "', path to cluster configuration not specified");
+                }
+            } catch (URISyntaxException e) {
+                throw new IllegalArgumentException("Invalid uri '" + uri + "': "+e);
+            }
+        }
+        return path;
+    }
+
+    ZooKeeper createZooKeeperConnection() throws URISyntaxException, IOException {
+        // Parse out the configuration URI.
+        URI uri = new URI(this.uri);
+        if (!uri.getScheme().equals("zk")) {
+            throw new IllegalArgumentException("Invalid uri '" + uri + "', expected it to start with zk://");
+        }
+        String host = uri.getHost();
+        if (host == null) {
+            throw new IllegalArgumentException("Invalid uri '" + uri + "', host not specified");
+        }
+        int port = uri.getPort();
+        if (port == -1) {
+            port = 2181;
+        }
+
+        ZooKeeper zk = new ZooKeeper(host, port, this);
+        zk.addAuthInfo("digest", (userid+":"+password).getBytes());
+        return zk;
+    }
+
+    private void processClusterStateChange() {
+        try {
+            if( zk==null ) {
+                return;
+            }
+
+            byte[] data = zk.getData(path, new Watcher() {
+                public void process(WatchedEvent event) {
+                    processClusterStateChange();
+                }
+            }, new Stat());
+            PBClusterConfiguration config = new PBClusterConfiguration();
+            config.mergeUnframed(data);
+            
+            ClusterState state = new ClusterState();
+            HashSet<String> slaves = new HashSet<String>(config.getMembersList());
+            if( config.hasMaster() ) {
+                state.setMaster(config.getMaster());
+                slaves.remove(config.getMaster());
+            }
+            state.setSlaves(new ArrayList<String>(slaves));
+            updateClusterState(state);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    synchronized public void stop() throws Exception {
+        startCounter--;
+        if (startCounter == 0) {
+            zk.close();
+            zk = null;
+        }
+    }
+
+    public void process(WatchedEvent event) {
+        System.out.println("Got: " + event);
+    }
+
+    public void setMemberStatus(final PBClusterNodeStatus status) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+        this.memberStatus = status;
+        if (statusPath == null) {
+            mkdirs(path + "/election");
+            statusPath = zk.create(path + "/election/n_", status.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+        } else {
+            Stat stat = zk.exists(statusPath, false);
+            if (status == null) {
+                zk.delete(statusPath, stat.getVersion());
+                statusPath = null;
+            } else {
+                zk.setData(statusPath, status.toUnframedByteArray(), stat.getVersion());
+            }
+        }
+        processElectionChange();
+    }
+
+    synchronized private void processElectionChange() {
+        try {
+            if( zk==null ) {
+                return;
+            }
+            List<String> zkNodes = zk.getChildren(path + "/election", new Watcher() {
+                public void process(WatchedEvent event) {
+                    processElectionChange();
+                }
+            });
+            Map<String, PBClusterNodeStatus> children = processNodeStatus(zkNodes);
+            
+            if( children.isEmpty() ) {
+                return;
+            }            
+            String firstNodeId = children.keySet().iterator().next();
+            
+            // If we are the first child?
+            if( firstNodeId.equals(statusPath) ) {
+                
+                // If we are master already no need to do anything else
+                if ( memberStatus.getConnectUri().equals(clusterState.getMaster()) ) {
+                    return;
+                }
+            
+                // We may need to wait till a little to figure out if we are
+                // actually the best pick to be the master.
+                switch (memberStatus.getState()) {
+                case MASTER:                        
+                case SLAVE_ONLINE:
+                    // Can transition to master immediately
+                    LOG.info("Online salve taking over as master.");
+                        setMaster(memberStatus.getConnectUri());
+                        return;
+   
+                    case SLAVE_SYNCRONIZING:
+                    case SLAVE_UNCONNECTED:                        
+                        
+                        // If it looks like we are the best master.. lets wait 5 secs to
+                    // let other slaves
+                    // join the cluster and get a chance to take over..
+                    if (areWeTheBestMaster(children)) {
+                        
+                        areWeTheBestMaster = true;
+                        if( takoverTask==null ) {
+                            LOG.info(memberStatus.getConnectUri()+" looks like the best offline slave that can take over as master.. waiting 5 secs to allow another slave to take over.");
+                            
+                            takoverTask = new Thread("Slave takeover..") {
+                                public void run() {
+                                    takoverAttempt();
+                                }
+                            };
+                            takoverTask.setDaemon(true);
+                            takoverTask.start();
+                        }
+                        return;
+                        
+                    } else {
+                        if( areWeTheBestMaster ) {
+                            LOG.info(memberStatus.getConnectUri()+" no longer looks like the best offline slave that can take over as master.");
+                        }
+                        
+                        areWeTheBestMaster = false;
+                        
+                        // If we get here we need to yield our top position in the node
+                        // sequence list so that the better
+                        // slave can become the master.
+                        Stat stat = zk.exists(statusPath, false);
+                        zk.delete(statusPath, stat.getVersion());
+                        statusPath = zk.create(path + "/election/n_", memberStatus.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected void takoverAttempt() {
+        try {
+            Thread.sleep(5 * 1000);
+            synchronized(this) {
+                try {
+                    if( areWeTheBestMaster ) {
+                        LOG.info(memberStatus.getConnectUri()+" is taking over as master.");
+                        setMaster(memberStatus.getConnectUri());
+                    }
+                } finally {
+                    // We want to make sure we set takoverTask to null in the same mutex as we set the master.
+                    takoverTask=null;
+                }
+            }
+        } catch (Exception e) {
+        } finally {
+            // sleep might error out..
+            synchronized(this) {
+                takoverTask=null;
+            }
+        }
+    }
+
+    private boolean areWeTheBestMaster(Map<String, PBClusterNodeStatus> children) {
+        Location ourLocation = ReplicationSupport.convert(memberStatus.getLastUpdate());
+        for (Entry<String, PBClusterNodeStatus> entry : children.entrySet()) {
+            PBClusterNodeStatus status = entry.getValue();
+            switch (status.getState()) {
+            case MASTER:
+            case SLAVE_ONLINE:
+                return false;
+
+            case SLAVE_SYNCRONIZING:
+            case SLAVE_UNCONNECTED:
+                if (ourLocation.compareTo(ReplicationSupport.convert(status.getLastUpdate())) < 0) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private Map<String, PBClusterNodeStatus> processNodeStatus(List<String> children) throws KeeperException, InterruptedException, InvalidProtocolBufferException {
+        java.util.TreeMap<String, PBClusterNodeStatus> rc = new java.util.TreeMap<String, PBClusterNodeStatus>();
+        for (String nodeId : children) {
+            try {
+                Stat stat = new Stat();
+                byte[] data = zk.getData(path + "/election/" + nodeId, false, stat);
+                PBClusterNodeStatus status = new PBClusterNodeStatus();
+                status.mergeUnframed(data);
+                rc.put(path + "/election/" + nodeId, status);
+            } catch (NoNodeException ignore) {
+            }
+        }
+        return rc;
+    }
+
+    public void addMember(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+        mkParentDirs(path);
+        update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+            public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+                PBClusterConfiguration config = new PBClusterConfiguration();
+                if (data != null) {
+                    config.mergeUnframed(data);
+                }
+                if (!config.getMembersList().contains(node)) {
+                    config.addMembers(node);
+                }
+                return config.toFramedByteArray();
+            }
+        });
+    }
+
+    public void setMaster(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+        mkParentDirs(path);
+        update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+            public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+                PBClusterConfiguration config = new PBClusterConfiguration();
+                if (data != null) {
+                    config.mergeUnframed(data);
+                }
+                config.setMaster(node);
+                return config.toFramedByteArray();
+            }
+        });
+    }
+
+    interface Updater<T extends Throwable> {
+        byte[] update(byte[] data) throws T;
+    }
+
+    private <T extends Throwable> void update(String path, CreateMode persistent, Updater<T> updater) throws InvalidProtocolBufferException, KeeperException, InterruptedException, T {
+        Stat stat = zk.exists(path, false);
+        if (stat != null) {
+            byte[] data = zk.getData(path, false, stat);
+            data = updater.update(data);
+            zk.setData(path, data, stat.getVersion());
+        } else {
+            byte[] update = updater.update(null);
+            try {
+                zk.create(path, update, Ids.OPEN_ACL_UNSAFE, persistent);
+            } catch (NodeExistsException ignore) {
+                stat = zk.exists(path, false);
+                byte[] data = zk.getData(path, false, stat);
+                data = updater.update(data);
+                zk.setData(path, data, stat.getVersion());
+            }
+        }
+    }
+
+    private void mkParentDirs(String path) throws KeeperException, InterruptedException {
+        int lastIndexOf = path.lastIndexOf("/");
+        if (lastIndexOf >= 0) {
+            mkdirs(path.substring(0, lastIndexOf));
+        }
+    }
+
+    private void mkdirs(String path) throws KeeperException, InterruptedException {
+        if (zk.exists(path, false) != null) {
+            return;
+        }
+        // Remove the leading /
+        if (path.startsWith("/")) {
+            path = path.substring(1);
+        }
+        String[] split = path.split("/");
+        String cur = "";
+        for (String node : split) {
+            cur += "/" + node;
+            try {
+                zk.create(cur, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (NodeExistsException ignore) {
+            }
+        }
+    }
+    
+}

Modified: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Mon Nov 17 16:53:32 2008
@@ -96,6 +96,7 @@
 	// The files that the slave should delete
 	repeated string delete_files=2;
 }
+
 message PBJournalUpdate {
     // Journal location of the update.
     required PBJournalLocation location=1;
@@ -108,3 +109,39 @@
     optional bool disk_sync=4;
 }
 
+// 
+// This hold 
+//
+message PBClusterConfiguration {
+    // Would be nice if the configuration of the broker was setup cluster wide.  We could
+    // stuff the spring config in here.. That way pushing out changes to the rest of the 
+    // cluster would be very easy.
+    optional bytes broker_configuration=1;
+    // Who are the nodes that have joined the cluster.  They may not all be online.
+    // Comes in handy to see if there are enough online members to form a quorum.
+    repeated string members=2;
+    // Who was the last elected master.
+    optional string master=3;
+}
+
+message PBClusterNodeStatus {
+
+	enum State {
+		// When the slave initially starts up it 
+		// is not connected to a master.
+		SLAVE_UNCONNECTED = 0;
+		// When the slave first attaches to a master, it must first 
+		// synchronize with the master to get any data updates
+		// that were missed while he was offline.
+		SLAVE_SYNCRONIZING = 1;
+		// The slave is caught up and is only actively applying
+		// real time updates from the master.
+		SLAVE_ONLINE = 3;
+		// This node is the master.
+		MASTER = 4;
+	}
+
+    required State state=1;
+    optional string connect_uri=2;
+    optional PBJournalLocation last_update=3;
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java?rev=718453&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java Mon Nov 17 16:53:32 2008
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.zk;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.util.Callback;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+public class ZooKeeperClusterStateManagerTest extends TestCase {
+
+    private static final int PORT = 2181;
+    private ZooKeeperClusterStateManager zkcsm1;
+    private ZooKeeper zk;
+    private Factory serverFactory;
+
+    public static boolean waitForServerUp(String host, int port, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                Socket sock = new Socket(host, port);
+                BufferedReader reader = null;
+                try {
+                    OutputStream outstream = sock.getOutputStream();
+                    outstream.write("stat".getBytes());
+                    outstream.flush();
+
+                    reader =
+                        new BufferedReader(
+                                new InputStreamReader(sock.getInputStream()));
+                    String line = reader.readLine();
+                    if (line != null && line.startsWith("Zookeeper version:")) {
+                        return true;
+                    }
+                } finally {
+                    sock.close();
+                    if (reader != null) {
+                        reader.close();
+                    }
+                }
+            } catch (IOException e) {
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+    
+
+    @Override
+    protected void setUp() throws Exception {
+
+        ServerStats.registerAsConcrete();
+
+        File tmpDir = new File("target/test-data/zookeeper");
+        tmpDir.mkdirs();
+
+        // Reduces startup time..
+        System.setProperty("zookeeper.preAllocSize", "100");
+        FileTxnLog.setPreallocSize(100);
+
+        ZooKeeperServer zs = new ZooKeeperServer(tmpDir, tmpDir, 3000);
+        
+        serverFactory = new NIOServerCnxn.Factory(PORT);
+        serverFactory.startup(zs);
+
+//        assertTrue("waiting for server up", waitForServerUp("localhost", PORT, 1000*5));
+            
+        zkcsm1 = new ZooKeeperClusterStateManager();
+        zk = zkcsm1.createZooKeeperConnection();
+        // Cleanup after previous run...
+        zkRecusiveDelete(zkcsm1.getPath());
+    }
+    
+    private void zkRecusiveDelete(String path) throws KeeperException, InterruptedException {
+        Stat stat = zk.exists(path, false);
+        if( stat!=null ) {
+            if( stat.getNumChildren() > 0 ) {
+                List<String> children = zk.getChildren(path, false);
+                for (String node : children) {
+                    zkRecusiveDelete(path+"/"+node);
+                }
+            }
+            zk.delete(path, stat.getVersion());
+        }
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        zk.close();
+        serverFactory.shutdown();
+        ServerStats.unregister();
+    }
+    
+    public void testTwoNodesGoingOnline() throws Exception {
+        final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+        final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
+        
+        zkcsm1.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents1.add(config);
+            }
+        });
+        zkcsm1.start();
+        zkcsm1.addMember("kdbr://localhost:60001");
+        
+        final ZooKeeperClusterStateManager zkcsm2 = new ZooKeeperClusterStateManager();
+        zkcsm2.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents2.add(config);
+            }
+        });
+        zkcsm2.start();
+        zkcsm2.addMember("kdbr://localhost:60002");
+        
+        // Drain the events..
+        while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) {
+        }
+        while( stateEvents2.poll(100, TimeUnit.MILLISECONDS)!=null ) {
+        }
+        
+        // Bring node 1 online
+        final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
+        status1.setConnectUri("kdbr://localhost:60001");
+        status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
+        status1.setState(State.SLAVE_UNCONNECTED);
+
+        executeAsync(new Callback() {
+            public void execute() throws Exception {
+                zkcsm1.setMemberStatus(status1);
+            }
+         });
+
+        // Bring node 2 online
+        final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
+        status2.setConnectUri("kdbr://localhost:60002");
+        status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
+        status2.setState(State.SLAVE_UNCONNECTED);
+        
+        executeAsync(new Callback() {
+            public void execute() throws Exception {
+                Thread.sleep(1000);
+                zkcsm2.setMemberStatus(status2);
+            }
+         });
+
+        ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60002", state.getMaster());
+        assertTrue(state.getSlaves().size()==1);
+
+        state = stateEvents2.poll(1, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60002", state.getMaster());
+        assertTrue(state.getSlaves().size()==1);
+
+        zkcsm2.stop();
+        zkcsm1.stop();
+    }
+
+    private void executeAsync(final Callback callback) {
+        new Thread("Async Test Task") {
+            @Override
+            public void run() {
+                try {
+                    callback.execute();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+    }
+    
+    public void testOneNodeGoingOnline() throws Exception {
+        final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+        zkcsm1.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents1.add(config);
+            }
+        });
+        zkcsm1.start();
+        
+        // Drain the events..
+        while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) {
+        }
+        
+        // Let node1 join the cluster.
+        zkcsm1.addMember("kdbr://localhost:60001");
+
+        ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNull(state.getMaster());
+        assertTrue(state.getSlaves().size()==1);
+        
+        // Let the cluster know that node1 is online..
+        PBClusterNodeStatus status = new PBClusterNodeStatus();
+        status.setConnectUri("kdbr://localhost:60001");
+        status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
+        status.setState(State.SLAVE_UNCONNECTED);
+        zkcsm1.setMemberStatus(status);
+
+        state = stateEvents1.poll(10, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60001", state.getMaster());
+        assertTrue(state.getSlaves().isEmpty());
+
+        zkcsm1.stop();
+    }
+}

Modified: activemq/sandbox/kahadb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/log4j.properties?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/log4j.properties (original)
+++ activemq/sandbox/kahadb/src/test/resources/log4j.properties Mon Nov 17 16:53:32 2008
@@ -21,6 +21,7 @@
 log4j.rootLogger=INFO, stdout
 
 log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.zookeeper=WARN
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender