You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/11 20:40:02 UTC

svn commit: r889781 [2/3] - in /activemq/sandbox/activemq-apollo/activemq-kahadb-replication: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/kahadb/ src/main/java/org/apache/kahadb/replication/ sr...

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannel;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannelFactory;
+import org.apache.activeblaze.cluster.BlazeClusterGroupConfiguration;
+import org.apache.activeblaze.cluster.MasterChangedListener;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+/**
+ * 
+ * @author rajdavies
+ * @org.apache.xbean.XBean element="blazeCluster"
+ */
+public class BlazeClusterStateManager extends BlazeClusterGroupConfiguration implements ClusterStateManager,
+        MemberChangedListener, MasterChangedListener {
+    private static final Log LOG = LogFactory.getLog(BlazeClusterStateManager.class);
+    private BlazeClusterGroupChannel channel;
+    private AtomicBoolean started = new AtomicBoolean();
+    private ClusterState clusterState;
+    private String localMemberName;
+    private PBClusterNodeStatus status;
+    final private List<ClusterListener> listeners = new CopyOnWriteArrayList<ClusterListener>();
+
+    /**
+     * @param listener
+     * @see org.apache.kahadb.replication.ClusterStateManager#addListener(org.apache.kahadb.replication.ClusterListener)
+     */
+    public void addListener(ClusterListener listener) {
+        this.listeners.add(listener);
+        initializeChannel();
+        fireClusterChange();
+    }
+
+    /**
+     * @param node
+     * @see org.apache.kahadb.replication.ClusterStateManager#addMember(java.lang.String)
+     */
+    public void addMember(String node) {
+        this.localMemberName = node;
+        initializeChannel();
+    }
+
+    /**
+     * @param listener
+     * @see org.apache.kahadb.replication.ClusterStateManager#removeListener(org.apache.kahadb.replication.ClusterListener)
+     */
+    public void removeListener(ClusterListener listener) {
+        this.listeners.remove(listener);
+    }
+
+    /**
+     * @param node
+     * @see org.apache.kahadb.replication.ClusterStateManager#removeMember(java.lang.String)
+     */
+    public void removeMember(String node) {
+    }
+
+    /**
+     * @param status
+     * @see org.apache.kahadb.replication.ClusterStateManager#setMemberStatus(org.apache.kahadb.replication.pb.PBClusterNodeStatus)
+     */
+    public void setMemberStatus(PBClusterNodeStatus status) {
+        if (status != null) {
+            this.status = status;
+            setMasterWeight(status.getState().getNumber());
+            if (this.channel != null) {
+                this.channel.getConfiguration().setMasterWeight(getMasterWeight());
+                try {
+                    this.channel.waitForElection(getAwaitGroupTimeout());
+                } catch (Exception e) {
+                    LOG.error("Wait for Election Failed");
+                }
+            }
+            processClusterStateChange();
+        }
+    }
+
+    /**
+     * @param arg0
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
+     */
+    public void memberStarted(Member arg0) {
+        processClusterStateChange();
+    }
+
+    /**
+     * @param arg0
+     * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
+     */
+    public void memberStopped(Member arg0) {
+        processClusterStateChange();
+    }
+
+    /**
+     * @param arg0
+     * @see org.apache.activeblaze.cluster.MasterChangedListener#masterChanged(org.apache.activeblaze.group.Member)
+     */
+    public void masterChanged(Member arg0) {
+        processClusterStateChange();
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activemq.Service#start()
+     */
+    public void start() throws Exception {
+        if (this.started.compareAndSet(false, true)) {
+            initializeChannel();
+        }
+        this.started.set(true);
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activemq.Service#stop()
+     */
+    public void stop() throws Exception {
+        if (this.started.compareAndSet(true, false)) {
+            if (this.channel != null) {
+                this.channel.removeMemberChangedListener(this);
+                this.channel.removeMasterChangedListener(this);
+                this.channel.shutDown();
+                this.channel = null;
+            }
+        }
+        this.started.set(false);
+    }
+
+    private boolean isStarted() {
+        return this.started.get();
+    }
+
+    synchronized private void updateClusterState(ClusterState clusterState) {
+        this.clusterState = clusterState;
+        fireClusterChange();
+    }
+
+    private void fireClusterChange() {
+        if (isStarted() && !this.listeners.isEmpty() && this.clusterState != null) {
+            for (ClusterListener listener : this.listeners) {
+                listener.onClusterChange(this.clusterState);
+            }
+        }
+    }
+
+    private void processClusterStateChange() {
+        if (isStarted()) {
+            try {
+                ClusterState state = new ClusterState();
+                this.channel.waitForElection(getAwaitGroupTimeout());
+                Set<Member> members = this.channel.getMembers();
+                Member master = this.channel.getMaster();
+                if (master != null) {
+                    // check we can be the master
+                    if (!this.channel.isMaster() || (this.status != null)) {
+                        state.setMaster(master.getName());
+                        members.remove(master);
+                    }
+                }
+                List<String> slaves = new ArrayList<String>();
+                for (Member slave : members) {
+                    slaves.add(slave.getName());
+                }
+                state.setSlaves(slaves);
+                updateClusterState(state);
+            } catch (Exception e) {
+                LOG.error("Failed to process Cluster State Changed", e);
+            }
+        }
+    }
+
+    private synchronized void initializeChannel() {
+        if (this.localMemberName != null && this.channel == null) {
+            try {
+                BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory(this);
+                this.channel = factory.createChannel(this.localMemberName);
+                this.channel.addMemberChangedListener(this);
+                this.channel.addMasterChangedListener(this);
+                if (isStarted()) {
+                    this.channel.start();
+                    this.channel.waitForElection(getAwaitGroupTimeout());
+                }
+                processClusterStateChange();
+            } catch (Exception e) {
+                LOG.error("Failed to create channel", e);
+            }
+        }
+    }
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.util.Map;
+
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ * 
+ * @version $Revision: 712224 $
+ */
+public class KDBRTransportFactory extends TcpTransportFactory {
+
+    protected String getDefaultWireFormatType() {
+        return "kdbr";
+    }
+
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        IntrospectionSupport.setProperties(transport, options);
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    protected boolean isUseInactivityMonitor(Transport transport) {
+        return false;
+    }
+    
+    /**
+     * Override to remove the correlation transport filter since that relies on Command to 
+     * multiplex multiple requests and this protocol does not support that.
+     */
+    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+        transport = compositeConfigure(transport, wf, options);
+        transport = new MutexTransport(transport);
+
+        return transport;
+    }
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+import org.apache.activemq.protobuf.Message;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+
+public class KDBRWireFormat implements WireFormat {
+
+	private int version;
+
+	public int getVersion() {
+		return version;
+	}
+
+	public void setVersion(int version) {
+		this.version = version;
+	}
+
+	public ByteSequence marshal(Object command) throws IOException {
+		throw new RuntimeException("Not implemented.");
+	}
+
+	public Object unmarshal(ByteSequence packet) throws IOException {
+		throw new RuntimeException("Not implemented.");
+	}
+
+	public void marshal(Object command, DataOutput out) throws IOException {
+		OutputStream os = (OutputStream) out;
+		ReplicationFrame frame = (ReplicationFrame) command;
+		PBHeader header = frame.getHeader();
+		switch (frame.getHeader().getType()) {
+		case FILE_TRANSFER_RESPONSE: {
+			// Write the header..
+			header.writeFramed(os);
+			// Stream the Payload.
+			InputStream is = (InputStream) frame.getPayload();
+			byte data[] = new byte[1024 * 4];
+			int c;
+			long remaining = frame.getHeader().getPayloadSize();
+			while (remaining > 0 && (c = is.read(data, 0, (int) Math.min(remaining, data.length))) >= 0) {
+				os.write(data, 0, c);
+				remaining -= c;
+			}
+			break;
+		}
+		default:
+			if (frame.getPayload() == null) {
+				header.clearPayloadSize();
+				header.writeFramed(os);
+			} else {
+				// All other payloads types are PB messages
+				Message message = (Message) frame.getPayload();
+				header.setPayloadSize(message.serializedSizeUnframed());
+				header.writeFramed(os);
+				message.writeUnframed(os);
+			}
+		}
+	}
+
+	public Object unmarshal(DataInput in) throws IOException {
+		InputStream is = (InputStream) in;
+		ReplicationFrame frame = new ReplicationFrame();
+		frame.setHeader(PBHeader.parseFramed(is));
+		switch (frame.getHeader().getType()) {
+		case FILE_TRANSFER_RESPONSE:
+			frame.setPayload(is);
+			break;
+		case FILE_TRANSFER:
+			readPBPayload(frame, in, new PBFileInfo());
+			break;
+		case JOURNAL_UPDATE:
+			readPBPayload(frame, in, new PBJournalUpdate());
+			break;
+		case JOURNAL_UPDATE_ACK:
+			readPBPayload(frame, in, new PBJournalLocation());
+			break;
+		case SLAVE_INIT:
+			readPBPayload(frame, in, new PBSlaveInit());
+			break;
+		case SLAVE_INIT_RESPONSE:
+			readPBPayload(frame, in, new PBSlaveInitResponse());
+			break;
+		}
+		return frame;
+	}
+
+	private void readPBPayload(ReplicationFrame frame, DataInput in, Message pb) throws IOException, InvalidProtocolBufferException {
+		long payloadSize = frame.getHeader().getPayloadSize();
+		byte[] payload;
+		payload = new byte[(int)payloadSize];
+		in.readFully(payload);
+		frame.setPayload(pb.mergeUnframed(payload));
+	}
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class KDBRWireFormatFactory implements WireFormatFactory {
+
+	public WireFormat createWireFormat() {
+		return new KDBRWireFormat();
+	}
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,520 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.zk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.ReplicationSupport;
+import org.apache.kahadb.replication.pb.PBClusterConfiguration;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * 
+ * @author chirino
+ * @org.apache.xbean.XBean element="zookeeperCluster"
+ */
+public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
+    private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);
+
+    final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+    private int startCounter;
+
+    private String uri = "zk://localhost:2181/activemq/ha-cluster/default";
+    String userid = "activemq";
+    String password = "";
+
+    private ZooKeeper zk;
+    private String path;
+
+    private ClusterState clusterState;
+    private String statusPath;
+    private PBClusterNodeStatus memberStatus;
+    private Thread takoverTask;
+    private boolean areWeTheBestMaster;
+
+    synchronized public void addListener(ClusterListener listener) {
+        listeners.add(listener);
+        fireClusterChange();
+    }
+
+    synchronized public void removeListener(ClusterListener listener) {
+        listeners.remove(listener);
+    }
+
+    synchronized private void updateClusterState(ClusterState clusterState) {
+        this.clusterState = clusterState;
+        fireClusterChange();
+    }
+
+    synchronized private void fireClusterChange() {
+        if (startCounter > 0 && !listeners.isEmpty()) {
+            for (ClusterListener listener : listeners) {
+                listener.onClusterChange(clusterState);
+            }
+        }
+    }
+
+    synchronized public void start() throws Exception {
+        startCounter++;
+        if (startCounter == 1) {
+
+            // Make sure the path is set..
+            String path = getPath();
+
+            // Create a ZooKeeper connection..
+            zk = createZooKeeperConnection();
+
+            while( isStarted() ) {
+                try {
+                    mkParentDirs(path);
+                    try {
+                        zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    } catch (NodeExistsException ignore) {
+                    }
+                    processClusterStateChange();
+                    return;
+                } catch (Exception e) {
+                    handleZKError(e);
+                }
+            }
+        }
+    }
+    
+    synchronized private boolean isStarted() {
+        return startCounter > 0;
+    }
+
+    synchronized public void stop() throws Exception {
+        startCounter--;
+        if (startCounter == 0) {
+            zk.close();
+            zk = null;
+            path=null;
+            clusterState=null;
+            statusPath=null;
+            memberStatus=null;
+            takoverTask=null;
+            areWeTheBestMaster=false;
+            
+        }
+    }
+
+
+    public String getPath() {
+        if( path == null ) {
+            try {
+                URI uri = new URI(this.uri);
+                path = uri.getPath();
+                if (path == null) {
+                    throw new IllegalArgumentException("Invalid uri '" + uri + "', path to cluster configuration not specified");
+                }
+            } catch (URISyntaxException e) {
+                throw new IllegalArgumentException("Invalid uri '" + uri + "': "+e);
+            }
+        }
+        return path;
+    }
+
+    public ZooKeeper createZooKeeperConnection() throws URISyntaxException, IOException {
+        // Parse out the configuration URI.
+        URI uri = new URI(this.uri);
+        if (!uri.getScheme().equals("zk")) {
+            throw new IllegalArgumentException("Invalid uri '" + uri + "', expected it to start with zk://");
+        }
+        String host = uri.getHost();
+        if (host == null) {
+            throw new IllegalArgumentException("Invalid uri '" + uri + "', host not specified");
+        }
+        int port = uri.getPort();
+        if (port == -1) {
+            port = 2181;
+        }
+
+        ZooKeeper zk = new ZooKeeper(host, port, this);
+        zk.addAuthInfo("digest", (userid+":"+password).getBytes());
+        return zk;
+    }
+
+    private void processClusterStateChange() {
+        try {
+            if( zk==null ) {
+                return;
+            }
+
+            byte[] data = zk.getData(path, new Watcher() {
+                public void process(WatchedEvent event) {
+                    processClusterStateChange();
+                }
+            }, new Stat());
+            PBClusterConfiguration config = new PBClusterConfiguration();
+            config.mergeUnframed(data);
+            
+            ClusterState state = new ClusterState();
+            HashSet<String> slaves = new HashSet<String>(config.getMembersList());
+            if( config.hasMaster() ) {
+                state.setMaster(config.getMaster());
+                slaves.remove(config.getMaster());
+            }
+            state.setSlaves(new ArrayList<String>(slaves));
+            updateClusterState(state);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void process(WatchedEvent event) {
+        System.out.println("Got: " + event);
+    }
+
+    public void setMemberStatus(final PBClusterNodeStatus status) {
+        while( isStarted() ) {
+            try {
+                
+                this.memberStatus = status;
+                if (statusPath == null) {
+                    mkdirs(path + "/election");
+                    statusPath = zk.create(path + "/election/n_", status.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+                } else {
+                    Stat stat = zk.exists(statusPath, false);
+                    if (status == null) {
+                        zk.delete(statusPath, stat.getVersion());
+                        statusPath = null;
+                    } else {
+                        zk.setData(statusPath, status.toUnframedByteArray(), stat.getVersion());
+                    }
+                }
+                processElectionChange();
+                return;
+                
+            } catch (Exception e) {
+                e.printStackTrace();
+                handleZKError(e);
+            }
+        }
+    }
+
+    synchronized private void processElectionChange() {
+        try {
+            if( zk==null ) {
+                return;
+            }
+            List<String> zkNodes = zk.getChildren(path + "/election", new Watcher() {
+                public void process(WatchedEvent event) {
+                    processElectionChange();
+                }
+            });
+            Map<String, PBClusterNodeStatus> children = processNodeStatus(zkNodes);
+            
+            if( children.isEmpty() ) {
+                return;
+            }            
+            String firstNodeId = children.keySet().iterator().next();
+            
+            // If we are the first child?
+            if( firstNodeId.equals(statusPath) ) {
+                
+                // If we are master already no need to do anything else
+                if ( memberStatus.getConnectUri().equals(clusterState.getMaster()) ) {
+                    return;
+                }
+            
+                // We may need to wait till a little to figure out if we are
+                // actually the best pick to be the master.
+                switch (memberStatus.getState()) {
+                case MASTER:                        
+                case SLAVE_ONLINE:
+                    // Can transition to master immediately
+                    LOG.info("Online salve taking over as master.");
+                        setMaster(memberStatus.getConnectUri());
+                        return;
+   
+                    case SLAVE_SYNCRONIZING:
+                    case SLAVE_UNCONNECTED:                        
+                        
+                        // If it looks like we are the best master.. lets wait 5 secs to
+                    // let other slaves
+                    // join the cluster and get a chance to take over..
+                    if (areWeTheBestMaster(children)) {
+                        
+                        areWeTheBestMaster = true;
+                        if( takoverTask==null ) {
+                            LOG.info(memberStatus.getConnectUri()+" looks like the best offline slave that can take over as master.. waiting 5 secs to allow another slave to take over.");
+                            
+                            takoverTask = new Thread("Slave takeover..") {
+                                public void run() {
+                                    takoverAttempt();
+                                }
+                            };
+                            takoverTask.setDaemon(true);
+                            takoverTask.start();
+                        }
+                        return;
+                        
+                    } else {
+                        if( areWeTheBestMaster ) {
+                            LOG.info(memberStatus.getConnectUri()+" no longer looks like the best offline slave that can take over as master.");
+                        }
+                        
+                        areWeTheBestMaster = false;
+                        
+                        // If we get here we need to yield our top position in the node
+                        // sequence list so that the better
+                        // slave can become the master.
+                        Stat stat = zk.exists(statusPath, false);
+                        zk.delete(statusPath, stat.getVersion());
+                        statusPath = zk.create(path + "/election/n_", memberStatus.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected void takoverAttempt() {
+        try {
+            for( int i=0; i < 10; i++ ) {
+                Thread.sleep(500);
+                if( !isStarted() )
+                    return;
+            }
+            
+            synchronized(this) {
+                try {
+                    if( areWeTheBestMaster ) {
+                        LOG.info(memberStatus.getConnectUri()+" is taking over as master.");
+                        setMaster(memberStatus.getConnectUri());
+                    }
+                } finally {
+                    // We want to make sure we set takoverTask to null in the same mutex as we set the master.
+                    takoverTask=null;
+                }
+            }
+        } catch (Exception e) {
+        } finally {
+            // sleep might error out..
+            synchronized(this) {
+                takoverTask=null;
+            }
+        }
+    }
+
+    private boolean areWeTheBestMaster(Map<String, PBClusterNodeStatus> children) {
+        Location ourLocation = ReplicationSupport.convert(memberStatus.getLastUpdate());
+        for (Entry<String, PBClusterNodeStatus> entry : children.entrySet()) {
+            PBClusterNodeStatus status = entry.getValue();
+            switch (status.getState()) {
+            case MASTER:
+            case SLAVE_ONLINE:
+                return false;
+
+            case SLAVE_SYNCRONIZING:
+            case SLAVE_UNCONNECTED:
+                if (ourLocation.compareTo(ReplicationSupport.convert(status.getLastUpdate())) < 0) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private Map<String, PBClusterNodeStatus> processNodeStatus(List<String> children) throws KeeperException, InterruptedException, InvalidProtocolBufferException {
+        java.util.TreeMap<String, PBClusterNodeStatus> rc = new java.util.TreeMap<String, PBClusterNodeStatus>();
+        for (String nodeId : children) {
+            try {
+                Stat stat = new Stat();
+                byte[] data = zk.getData(path + "/election/" + nodeId, false, stat);
+                PBClusterNodeStatus status = new PBClusterNodeStatus();
+                status.mergeUnframed(data);
+                rc.put(path + "/election/" + nodeId, status);
+            } catch (NoNodeException ignore) {
+            }
+        }
+        return rc;
+    }
+
+    public void addMember(final String node) {
+        while( isStarted() ) {
+            try {
+                mkParentDirs(path);
+                update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+                    public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+                        PBClusterConfiguration config = new PBClusterConfiguration();
+                        if (data != null) {
+                            config.mergeUnframed(data);
+                        }
+                        if (!config.getMembersList().contains(node)) {
+                            config.addMembers(node);
+                        }
+                        return config.toFramedByteArray();
+                    }
+                });
+                return;
+            } catch (Exception e) {
+                handleZKError(e);
+            }
+        }
+    }
+
+    public void removeMember(final String node) {
+        while( isStarted() ) {
+            try {
+                mkParentDirs(path);
+                update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+                    public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+                        PBClusterConfiguration config = new PBClusterConfiguration();
+                        if (data != null) {
+                            config.mergeUnframed(data);
+                        }
+                        config.getMembersList().remove(node);
+                        return config.toFramedByteArray();
+                    }
+                });
+                return;
+            } catch (Exception e) {
+                handleZKError(e);
+            }
+        }
+    }
+
+    public void setMaster(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+        mkParentDirs(path);
+        update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+            public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+                PBClusterConfiguration config = new PBClusterConfiguration();
+                if (data != null) {
+                    config.mergeUnframed(data);
+                }
+                config.setMaster(node);
+                return config.toFramedByteArray();
+            }
+        });
+    }
+
+    interface Updater<T extends Throwable> {
+        byte[] update(byte[] data) throws T;
+    }
+
+    private <T extends Throwable> void update(String path, CreateMode persistent, Updater<T> updater) throws InvalidProtocolBufferException, KeeperException, InterruptedException, T {
+        Stat stat = zk.exists(path, false);
+        if (stat != null) {
+            byte[] data = zk.getData(path, false, stat);
+            data = updater.update(data);
+            zk.setData(path, data, stat.getVersion());
+        } else {
+            byte[] update = updater.update(null);
+            try {
+                zk.create(path, update, Ids.OPEN_ACL_UNSAFE, persistent);
+            } catch (NodeExistsException ignore) {
+                stat = zk.exists(path, false);
+                byte[] data = zk.getData(path, false, stat);
+                data = updater.update(data);
+                zk.setData(path, data, stat.getVersion());
+            }
+        }
+    }
+
+    private void mkParentDirs(String path) throws KeeperException, InterruptedException {
+        int lastIndexOf = path.lastIndexOf("/");
+        if (lastIndexOf >= 0) {
+            mkdirs(path.substring(0, lastIndexOf));
+        }
+    }
+
+    private void mkdirs(String path) throws KeeperException, InterruptedException {
+        if (zk.exists(path, false) != null) {
+            return;
+        }
+        // Remove the leading /
+        if (path.startsWith("/")) {
+            path = path.substring(1);
+        }
+        String[] split = path.split("/");
+        String cur = "";
+        for (String node : split) {
+            cur += "/" + node;
+            try {
+                zk.create(cur, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            } catch (NodeExistsException ignore) {
+            }
+        }
+    }
+
+    
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public String getUserid() {
+        return userid;
+    }
+
+    public void setUserid(String userid) {
+        this.userid = userid;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+    
+    private void handleZKError(Exception e) {
+        LOG.warn("ZooKeeper error.  Will retry operation in 1 seconds");
+        LOG.debug("The error was: "+e, e);
+        
+        for( int i=0; i < 10; i ++) { 
+            try {
+                if( !isStarted() )
+                    return;
+                Thread.sleep(100);
+            } catch (InterruptedException e1) {
+                return;
+            }
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto Fri Dec 11 19:39:58 2009
@@ -0,0 +1,147 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.kahadb.replication.pb;
+
+option java_multiple_files = true;
+option java_outer_classname = "PB";
+
+//
+// 
+//
+message PBHeader {
+    required PBType type=1;
+    optional int64 payload_size=2;    
+}
+
+enum PBType {
+
+	// Sent from the slave to the master when the slave first starts.  It lets the master
+	// know about the slave's synchronization state.  This allows the master decide how to best synchronize
+	// the slave.
+	//    
+	// @followed-by PBSlaveInit	
+	SLAVE_INIT = 0;
+	
+	// The Master will send this response back to the slave, letting it know what it needs to do to get
+	// it's data files synchronized with the master.
+	//    
+	// @followed-by PBSlaveInitResponse	
+	SLAVE_INIT_RESPONSE = 1;
+  
+  	// The Slave will send this this command to the master once he has completed 
+  	// all his bulk synchronizations and he is ready to take over as being a master. 
+	//    
+	// @followed-by null	
+  	SLAVE_ONLINE=2;
+  	
+	// Sent from the Master to the slave to replicate a Journal update.
+	//    
+	// @followed-by PBJournalUpdate	
+	JOURNAL_UPDATE=3;	
+	
+	// An ack sent back to the master in response to to a received 
+	// JOURNAL_UPDATE
+	//    
+	// @followed-by PBJournalLocation	
+	JOURNAL_UPDATE_ACK=4;
+	
+	// A Request for a bulk file transfer.  Sent from a slave to a Master
+	//    
+	// @followed-by PBFileInfo	
+	FILE_TRANSFER=5;
+
+	// A bulk file transfer response
+	//    
+	// @followed-by the bytes of the requested file. 	
+	FILE_TRANSFER_RESPONSE=6;
+}
+
+message PBFileInfo {
+    required string name=1;
+    optional int32 snapshot_id=2;
+    optional sfixed64 checksum=3;    
+    optional int64 start=4;    
+    optional int64 end=5;    
+}
+
+message PBJournalLocation {
+	required int32 file_id=1;
+	required int32 offset=2; 
+}
+
+message PBSlaveInit {
+    // The id of the slave node that is being initialized
+    required string node_id=1;
+    // The files that the slave node currently has
+	repeated PBFileInfo current_files=2;	
+}
+
+message PBSlaveInitResponse {
+	// The files that the slave should bulk copy from the master..
+	repeated PBFileInfo copy_files=1;
+	// The files that the slave should delete
+	repeated string delete_files=2;
+}
+
+message PBJournalUpdate {
+    // Journal location of the update.
+    required PBJournalLocation location=1;
+    // The data that will be written at that location.
+    required bytes data=2;
+    // Should the slave send back an ack for this update.
+    optional bool send_ack=3;
+    // If true, then the slave should do a disk sync before returning a
+    // JOURNAL_UPDATE_ACK
+    optional bool disk_sync=4;
+}
+
+// 
+// This hold 
+//
+message PBClusterConfiguration {
+    // Would be nice if the configuration of the broker was setup cluster wide.  We could
+    // stuff the spring config in here.. That way pushing out changes to the rest of the 
+    // cluster would be very easy.
+    optional bytes broker_configuration=1;
+    // Who are the nodes that have joined the cluster.  They may not all be online.
+    // Comes in handy to see if there are enough online members to form a quorum.
+    repeated string members=2;
+    // Who was the last elected master.
+    optional string master=3;
+}
+
+message PBClusterNodeStatus {
+
+	enum State {
+		// When the slave initially starts up it 
+		// is not connected to a master.
+		SLAVE_UNCONNECTED = 0;
+		// When the slave first attaches to a master, it must first 
+		// synchronize with the master to get any data updates
+		// that were missed while he was offline.
+		SLAVE_SYNCRONIZING = 1;
+		// The slave is caught up and is only actively applying
+		// real time updates from the master.
+		SLAVE_ONLINE = 3;
+		// This node is the master.
+		MASTER = 4;
+	}
+
+    required State state=1;
+    optional string connect_uri=2;
+    optional PBJournalLocation last_update=3;
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml Fri Dec 11 19:39:58 2009
@@ -0,0 +1,153 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
+  http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+    <!-- Allows us to use system properties as variables in this configuration file -->
+    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+         <property name="locations">
+            <value>file:${activemq.base}/conf/credentials.properties</value>
+         </property>      
+    </bean>
+
+    <broker xmlns="http://activemq.apache.org/schema/core" 
+      start="false" 
+      brokerName="localhost" dataDirectory="${activemq.base}/data">
+
+        <!-- Destination specific policies using destination names or wildcards -->
+        <destinationPolicy>
+            <policyMap>
+                <policyEntries>
+                    <policyEntry queue=">" memoryLimit="5mb"/>
+                    <policyEntry topic=">" memoryLimit="5mb">
+                      <!-- you can add other policies too such as these
+                        <dispatchPolicy>
+                            <strictOrderDispatchPolicy/>
+                        </dispatchPolicy>
+                        <subscriptionRecoveryPolicy>
+                            <lastImageSubscriptionRecoveryPolicy/>
+                        </subscriptionRecoveryPolicy>
+                      -->
+                    </policyEntry>
+                </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+
+        <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+
+        <!-- The store and forward broker networks ActiveMQ will listen to -->
+        <networkConnectors>
+            <!-- by default just auto discover the other brokers -->
+            <networkConnector name="default-nc" uri="multicast://default"/>
+            <!-- Example of a static configuration:
+            <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
+            -->
+        </networkConnectors>
+
+        <sslContext>
+            <sslContext keyStore="file:${activemq.base}/conf/broker.ks" keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" trustStorePassword="password"/>
+        </sslContext>
+        
+        <!--  The maximum about of space the broker will use before slowing down producers -->
+        <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage limit="20 mb"/>
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="1 gb" name="foo"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="100 mb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+
+        <!-- The transport connectors ActiveMQ will listen to -->
+        <transportConnectors>
+            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
+            <transportConnector name="ssl" uri="ssl://localhost:61617"/>
+            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
+            <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
+        </transportConnectors>
+
+    </broker>
+
+    <!--
+    ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker
+    ** For more details see
+    **
+    ** http://activemq.apache.org/enterprise-integration-patterns.html
+    -->
+    <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+
+        <!-- You can use a <package> element for each root package to search for Java routes -->
+        <package>org.foo.bar</package>
+
+        <!-- You can use Spring XML syntax to define the routes here using the <route> element -->
+        <route>
+            <from uri="activemq:example.A"/>
+            <to uri="activemq:example.B"/>
+        </route>
+    </camelContext>
+
+    <!--
+    ** Lets configure some Camel endpoints
+    **
+    ** http://activemq.apache.org/camel/components.html
+    -->
+
+    <!-- configure the camel activemq component to use the current broker -->
+    <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
+        <property name="connectionFactory">
+          <bean class="org.apache.activemq.ActiveMQConnectionFactory">
+            <property name="brokerURL" value="vm://localhost?create=false&amp;waitForStart=10000" />
+            <property name="userName" value="${activemq.username}"/>
+            <property name="password" value="${activemq.password}"/>
+          </bean>
+        </property>
+    </bean>
+
+    <!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
+    <!--
+    <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/>
+    -->
+
+    <!-- An embedded servlet engine for serving up the Admin console -->
+    <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
+        <connectors>
+            <nioConnector port="8161"/>
+        </connectors>
+
+        <handlers>
+            <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
+            <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
+            <webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
+        </handlers>
+    </jetty>
+
+</beans>
+<!-- END SNIPPET: example -->

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml Fri Dec 11 19:39:58 2009
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans xmlns="http://www.springframework.org/schema/beans" 
+       xmlns:amq="http://activemq.apache.org/schema/core" 
+       xmlns:kdb="http://activemq.apache.org/schema/kahadb" 
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+       xsi:schemaLocation="http://www.springframework.org/schema/beans 
+                           http://www.springframework.org/schema/beans/spring-beans-2.0.xsd   
+                           http://activ emq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd      
+                           http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd   
+                           http://activemq.apache.org/schema/kahadb http://activemq.apache.org/schema/kahadb/kahadb.xsd">
+                          
+  <!-- Allows us to use system properties as variables in this configuration file -->
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+    <property name="locations">
+      <value>file:${activemq.base}/conf/credentials.properties</value>
+    </property>
+  </bean>
+  
+  <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
+    <replicationService>
+      <kahadbReplication 
+        directory="${activemq.base}/data/kahadb" 
+        brokerURI="xbean:ha-broker.xml" 
+        uri="kdbr://localhost:60001"
+        minimumReplicas="1">
+        
+        <cluster>
+          <zookeeperCluster uri="zk://localhost:2181/activemq/default-ha-group" userid="activemq" password=""/>
+        </cluster>
+        
+      </kahadbReplication>
+    </replicationService>
+  </kahadbReplicationBroker>
+  
+</beans>
+<!-- END SNIPPET: example -->

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr Fri Dec 11 19:39:58 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRTransportFactory

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr Fri Dec 11 19:39:58 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRWireFormatFactory

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties Fri Dec 11 19:39:58 2009
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, out
+
+
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.util.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ReplicationTest extends TestCase {
+
+	
+	private static final String BROKER1_URI = "tcp://localhost:61001";
+	private static final String BROKER2_URI = "tcp://localhost:61002";
+
+	private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
+	private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
+	
+	private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+
+	public void testReplication() throws Exception {
+		
+		// This cluster object will control who becomes the master.
+		StaticClusterStateManager cluster = new StaticClusterStateManager();
+		
+		ReplicationService rs1 = new ReplicationService();
+		rs1.setMinimumReplicas(0);
+		rs1.setUri(BROKER1_REPLICATION_ID);
+		rs1.setCluster(cluster);
+		rs1.setDirectory(new File("target/replication-test/broker1"));
+		rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1");
+		rs1.start();
+
+        ReplicationService rs2 = new ReplicationService();
+        rs2.setMinimumReplicas(0);
+        rs2.setUri(BROKER2_REPLICATION_ID);
+        rs2.setCluster(cluster);
+        rs2.setDirectory(new File("target/replication-test/broker2"));
+        rs2.setBrokerURI("broker://(" + BROKER2_URI + ")/broker2");
+        rs2.start();
+		
+//		// None of the brokers should be accepting connections since they are not masters.
+//		try {
+//			sendMesagesTo(1, BROKER1_URI);
+//			fail("Connection failure expected.");
+//		} catch( JMSException e ) {
+//		}
+		
+		// Make b1 the master.
+		ClusterState clusterState = new ClusterState();
+		clusterState.setMaster(BROKER1_REPLICATION_ID);
+		cluster.setClusterState(clusterState);
+		
+		try {
+			sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
+		} catch( JMSException e ) {
+			fail("b1 did not become a master.");
+		}
+		
+		// Make broker 2 a salve.
+		clusterState = new ClusterState();
+		clusterState.setMaster(BROKER1_REPLICATION_ID);
+		String[] slaves = {BROKER2_REPLICATION_ID};
+		clusterState.setSlaves(Arrays.asList(slaves));
+		cluster.setClusterState(clusterState);
+		
+		Thread.sleep(1000);
+		
+		
+		try {
+			sendMesagesTo(BROKER1_URI, 100, "Pass 2: ");
+		} catch( JMSException e ) {
+			fail("Failed to send more messages...");
+		}
+		
+		Thread.sleep(2000);
+		
+		// Make broker 2 the master.
+		clusterState = new ClusterState();
+		clusterState.setMaster(BROKER2_REPLICATION_ID);
+		cluster.setClusterState(clusterState);
+
+		Thread.sleep(1000);
+		
+		assertReceived(200, BROKER2_URI);
+		
+		rs2.stop();		
+		rs1.stop();
+		
+	}
+
+	private void assertReceived(int count, String brokerUri) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		con.start();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageConsumer consumer = session.createConsumer(destination);
+			for (int i = 0; i < count; i++) {
+				TextMessage m = (TextMessage) consumer.receive(1000);
+				if( m==null ) {
+					fail("Failed to receive message: "+i);
+				}
+				System.out.println("Got: "+m.getText());
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+
+	private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageProducer producer = session.createProducer(destination);
+			for (int i = 0; i < count; i++) {
+				producer.send(session.createTextMessage(msg+i));
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+	
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.ArrayList;
+
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+public class StaticClusterStateManager implements ClusterStateManager {
+
+	final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+	private ClusterState clusterState;
+	private int startCounter;
+
+	synchronized public ClusterState getClusterState() {
+		return clusterState;
+	}
+
+	synchronized public void setClusterState(ClusterState clusterState) {
+		this.clusterState = clusterState;
+		fireClusterChange();
+	}
+
+	synchronized public void addListener(ClusterListener listener) {
+		listeners.add(listener);
+		fireClusterChange();
+	}
+
+	synchronized public void removeListener(ClusterListener listener) {
+		listeners.remove(listener);
+	}
+
+	synchronized public void start() throws Exception {
+		startCounter++;
+		fireClusterChange();
+	}
+
+	synchronized private void fireClusterChange() {
+		if( startCounter>0 && !listeners.isEmpty() && clusterState!=null ) {
+			for (ClusterListener listener : listeners) {
+				listener.onClusterChange(clusterState);
+			}
+		}
+	}
+
+	synchronized public void stop() throws Exception {
+		startCounter--;
+	}
+
+    public void addMember(String node) {
+    }
+
+    public void removeMember(String node) {
+    }
+
+    public void setMemberStatus(PBClusterNodeStatus status) {
+    }
+
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.kahadb.replication.zk.ZooKeeperClusterStateManager;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+public class XBeanReplicationTest extends TestCase {
+
+	
+	private static final String BROKER1_URI = "tcp://localhost:61616";
+	private static final String BROKER2_URI = "tcp://localhost:61617";
+
+	private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+	
+    private static final int PORT = 2181;
+    private Factory serverFactory;
+
+	public void testReplication() throws Exception {
+		
+        startZooKeeper();
+
+		BrokerService broker1 = BrokerFactory.createBroker("xbean:broker1/ha.xml");
+        broker1.start();
+        
+        // Wait for the broker to get setup..
+        Thread.sleep(7000);
+        
+        sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
+
+        // Create broker 2 which will join in and sync up with the existing master.
+        BrokerService broker2 = BrokerFactory.createBroker("xbean:broker2/ha.xml");
+        broker2.start();
+		
+        // Give it some time to sync up..
+        Thread.sleep(1000);
+        
+        // Stopping broker1 should make broker2 the master.
+        broker1.stop();
+
+        Thread.sleep(1000);
+        
+        // Did all the messages get synced up?
+		assertReceived(100, BROKER2_URI);
+		// Send some more message... 
+        sendMesagesTo(BROKER2_URI, 50, "Pass 2: ");
+		
+		// Start broker1 up again.. it should re-sync with master 2 
+		broker1.start();
+        // Give it some time to sync up..
+        Thread.sleep(1000);
+		
+		// stopping the master.. 
+		broker2.stop();
+		
+		// Did the new state get synced right?
+        assertReceived(50, BROKER1_URI);
+		
+        broker1.stop();
+        
+        stopZooKeeper();
+
+	}
+
+    private void stopZooKeeper() {
+        serverFactory.shutdown();
+        ServerStats.unregister();
+    }
+
+    private void startZooKeeper() throws IOException, InterruptedException, URISyntaxException {
+        ServerStats.registerAsConcrete();
+        File zooKeeperData = new File("target/test-data/zookeeper-"+System.currentTimeMillis());
+        zooKeeperData.mkdirs();
+
+        // Reduces startup time..
+        System.setProperty("zookeeper.preAllocSize", "100");
+        FileTxnLog.setPreallocSize(100);
+        ZooKeeperServer zs = new ZooKeeperServer(zooKeeperData, zooKeeperData, 3000);
+        
+        serverFactory = new NIOServerCnxn.Factory(PORT);
+        serverFactory.startup(zs);
+        
+        ZooKeeperClusterStateManager zkcsm1 = new ZooKeeperClusterStateManager();
+        zkcsm1.setUri("zk://localhost:"+PORT+"/");
+        ZooKeeper zk = zkcsm1.createZooKeeperConnection();
+        
+        // Wait till the ZK client gets connected..
+        States state;
+        while( (state = zk.getState()) != States.CONNECTED ) {
+            Thread.sleep(100);
+        }
+        zk.close();
+        
+    }
+
+	private void assertReceived(int count, String brokerUri) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		con.start();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageConsumer consumer = session.createConsumer(destination);
+			for (int i = 0; i < count; i++) {
+				TextMessage m = (TextMessage) consumer.receive(1000);
+				if( m==null ) {
+					fail("Failed to receive message: "+i);
+				}
+				System.out.println("Got: "+m.getText());
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+
+	private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
+		ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+		Connection con = cf.createConnection();
+		try {
+			Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageProducer producer = session.createProducer(destination);
+			for (int i = 0; i < count; i++) {
+				producer.send(session.createTextMessage(msg+i));
+			}
+		} finally {
+			try { con.close(); } catch (Throwable e) {}
+		}
+	}
+	
+}

Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.apache.activemq.util.Callback;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+
+/**
+ * @author rajdavies
+ * 
+ */
+public class BlazeClusterStateManagerTest extends TestCase {
+    public void testTwoNodesGoingOnline() throws Exception {
+        final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+        final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
+        final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+        bcsm1.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents1.add(config);
+            }
+        });
+        bcsm1.start();
+        bcsm1.addMember("kdbr://localhost:60001");
+        final BlazeClusterStateManager bcsm2 = new BlazeClusterStateManager();
+        bcsm2.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents2.add(config);
+            }
+        });
+        bcsm2.start();
+        bcsm2.addMember("kdbr://localhost:60002");
+        // Drain the events..
+        while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+        }
+        while (stateEvents2.poll(100, TimeUnit.MILLISECONDS) != null) {
+        }
+        // Bring node 1 online
+        final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
+        status1.setConnectUri("kdbr://localhost:60001");
+        status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
+        status1.setState(State.SLAVE_UNCONNECTED);
+        executeAsync(new Callback() {
+            public void execute() throws Exception {
+                bcsm1.setMemberStatus(status1);
+            }
+        });
+        // Bring node 2 online
+        final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
+        status2.setConnectUri("kdbr://localhost:60002");
+        status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
+        status2.setState(State.SLAVE_UNCONNECTED);
+        executeAsync(new Callback() {
+            public void execute() throws Exception {
+                Thread.sleep(1000);
+                bcsm2.setMemberStatus(status2);
+            }
+        });
+        ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60002", state.getMaster());
+        assertTrue(state.getSlaves().size() == 1);
+        state = stateEvents2.poll(2, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60002", state.getMaster());
+        assertTrue(state.getSlaves().size() == 1);
+        bcsm2.stop();
+        bcsm1.stop();
+    }
+
+    public void testOneNodeGoingOnline() throws Exception {
+        final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+        final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+        bcsm1.addListener(new ClusterListener() {
+            public void onClusterChange(ClusterState config) {
+                stateEvents1.add(config);
+            }
+        });
+        bcsm1.start();
+        // Drain the events..
+        while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+        }
+        // Let node1 join the cluster.
+        bcsm1.addMember("kdbr://localhost:60001");
+        ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNull(state.getMaster());
+        assertTrue(state.getSlaves().size() == 1);
+        // Let the cluster know that node1 is online..
+        PBClusterNodeStatus status = new PBClusterNodeStatus();
+        status.setConnectUri("kdbr://localhost:60001");
+        status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
+        status.setState(State.SLAVE_UNCONNECTED);
+        bcsm1.setMemberStatus(status);
+        state = stateEvents1.poll(10, TimeUnit.SECONDS);
+        assertNotNull(state);
+        assertNotNull(state.getMaster());
+        assertEquals("kdbr://localhost:60001", state.getMaster());
+        assertTrue(state.getSlaves().isEmpty());
+        bcsm1.stop();
+    }
+
+    private void executeAsync(final Callback callback) {
+        new Thread("Async Test Task") {
+            @Override
+            public void run() {
+                try {
+                    callback.execute();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+    }
+}