You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/12/11 20:40:02 UTC
svn commit: r889781 [2/3] - in
/activemq/sandbox/activemq-apollo/activemq-kahadb-replication: ./ src/
src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/kahadb/
src/main/java/org/apache/kahadb/replication/ sr...
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannel;
+import org.apache.activeblaze.cluster.BlazeClusterGroupChannelFactory;
+import org.apache.activeblaze.cluster.BlazeClusterGroupConfiguration;
+import org.apache.activeblaze.cluster.MasterChangedListener;
+import org.apache.activeblaze.group.Member;
+import org.apache.activeblaze.group.MemberChangedListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+/**
+ *
+ * @author rajdavies
+ * @org.apache.xbean.XBean element="blazeCluster"
+ */
+public class BlazeClusterStateManager extends BlazeClusterGroupConfiguration implements ClusterStateManager,
+ MemberChangedListener, MasterChangedListener {
+ private static final Log LOG = LogFactory.getLog(BlazeClusterStateManager.class);
+ private BlazeClusterGroupChannel channel;
+ private AtomicBoolean started = new AtomicBoolean();
+ private ClusterState clusterState;
+ private String localMemberName;
+ private PBClusterNodeStatus status;
+ final private List<ClusterListener> listeners = new CopyOnWriteArrayList<ClusterListener>();
+
+ /**
+ * @param listener
+ * @see org.apache.kahadb.replication.ClusterStateManager#addListener(org.apache.kahadb.replication.ClusterListener)
+ */
+ public void addListener(ClusterListener listener) {
+ this.listeners.add(listener);
+ initializeChannel();
+ fireClusterChange();
+ }
+
+ /**
+ * @param node
+ * @see org.apache.kahadb.replication.ClusterStateManager#addMember(java.lang.String)
+ */
+ public void addMember(String node) {
+ this.localMemberName = node;
+ initializeChannel();
+ }
+
+ /**
+ * @param listener
+ * @see org.apache.kahadb.replication.ClusterStateManager#removeListener(org.apache.kahadb.replication.ClusterListener)
+ */
+ public void removeListener(ClusterListener listener) {
+ this.listeners.remove(listener);
+ }
+
+ /**
+ * @param node
+ * @see org.apache.kahadb.replication.ClusterStateManager#removeMember(java.lang.String)
+ */
+ public void removeMember(String node) {
+ }
+
+ /**
+ * @param status
+ * @see org.apache.kahadb.replication.ClusterStateManager#setMemberStatus(org.apache.kahadb.replication.pb.PBClusterNodeStatus)
+ */
+ public void setMemberStatus(PBClusterNodeStatus status) {
+ if (status != null) {
+ this.status = status;
+ setMasterWeight(status.getState().getNumber());
+ if (this.channel != null) {
+ this.channel.getConfiguration().setMasterWeight(getMasterWeight());
+ try {
+ this.channel.waitForElection(getAwaitGroupTimeout());
+ } catch (Exception e) {
+ LOG.error("Wait for Election Failed");
+ }
+ }
+ processClusterStateChange();
+ }
+ }
+
+ /**
+ * @param arg0
+ * @see org.apache.activeblaze.group.MemberChangedListener#memberStarted(org.apache.activeblaze.group.Member)
+ */
+ public void memberStarted(Member arg0) {
+ processClusterStateChange();
+ }
+
+ /**
+ * @param arg0
+ * @see org.apache.activeblaze.group.MemberChangedListener#memberStopped(org.apache.activeblaze.group.Member)
+ */
+ public void memberStopped(Member arg0) {
+ processClusterStateChange();
+ }
+
+ /**
+ * @param arg0
+ * @see org.apache.activeblaze.cluster.MasterChangedListener#masterChanged(org.apache.activeblaze.group.Member)
+ */
+ public void masterChanged(Member arg0) {
+ processClusterStateChange();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.apache.activemq.Service#start()
+ */
+ public void start() throws Exception {
+ if (this.started.compareAndSet(false, true)) {
+ initializeChannel();
+ }
+ this.started.set(true);
+ }
+
+ /**
+ * @throws Exception
+ * @see org.apache.activemq.Service#stop()
+ */
+ public void stop() throws Exception {
+ if (this.started.compareAndSet(true, false)) {
+ if (this.channel != null) {
+ this.channel.removeMemberChangedListener(this);
+ this.channel.removeMasterChangedListener(this);
+ this.channel.shutDown();
+ this.channel = null;
+ }
+ }
+ this.started.set(false);
+ }
+
+ private boolean isStarted() {
+ return this.started.get();
+ }
+
+ synchronized private void updateClusterState(ClusterState clusterState) {
+ this.clusterState = clusterState;
+ fireClusterChange();
+ }
+
+ private void fireClusterChange() {
+ if (isStarted() && !this.listeners.isEmpty() && this.clusterState != null) {
+ for (ClusterListener listener : this.listeners) {
+ listener.onClusterChange(this.clusterState);
+ }
+ }
+ }
+
+ private void processClusterStateChange() {
+ if (isStarted()) {
+ try {
+ ClusterState state = new ClusterState();
+ this.channel.waitForElection(getAwaitGroupTimeout());
+ Set<Member> members = this.channel.getMembers();
+ Member master = this.channel.getMaster();
+ if (master != null) {
+ // check we can be the master
+ if (!this.channel.isMaster() || (this.status != null)) {
+ state.setMaster(master.getName());
+ members.remove(master);
+ }
+ }
+ List<String> slaves = new ArrayList<String>();
+ for (Member slave : members) {
+ slaves.add(slave.getName());
+ }
+ state.setSlaves(slaves);
+ updateClusterState(state);
+ } catch (Exception e) {
+ LOG.error("Failed to process Cluster State Changed", e);
+ }
+ }
+ }
+
+ private synchronized void initializeChannel() {
+ if (this.localMemberName != null && this.channel == null) {
+ try {
+ BlazeClusterGroupChannelFactory factory = new BlazeClusterGroupChannelFactory(this);
+ this.channel = factory.createChannel(this.localMemberName);
+ this.channel.addMemberChangedListener(this);
+ this.channel.addMasterChangedListener(this);
+ if (isStarted()) {
+ this.channel.start();
+ this.channel.waitForElection(getAwaitGroupTimeout());
+ }
+ processClusterStateChange();
+ } catch (Exception e) {
+ LOG.error("Failed to create channel", e);
+ }
+ }
+ }
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRTransportFactory.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.util.Map;
+
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
+ *
+ * @version $Revision: 712224 $
+ */
+public class KDBRTransportFactory extends TcpTransportFactory {
+
+ protected String getDefaultWireFormatType() {
+ return "kdbr";
+ }
+
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ IntrospectionSupport.setProperties(transport, options);
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ protected boolean isUseInactivityMonitor(Transport transport) {
+ return false;
+ }
+
+ /**
+ * Override to remove the correlation transport filter since that relies on Command to
+ * multiplex multiple requests and this protocol does not support that.
+ */
+ public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
+ transport = compositeConfigure(transport, wf, options);
+ transport = new MutexTransport(transport);
+
+ return transport;
+ }
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormat.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+import org.apache.activemq.protobuf.Message;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.kahadb.replication.ReplicationFrame;
+import org.apache.kahadb.replication.pb.PBFileInfo;
+import org.apache.kahadb.replication.pb.PBHeader;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBJournalUpdate;
+import org.apache.kahadb.replication.pb.PBSlaveInit;
+import org.apache.kahadb.replication.pb.PBSlaveInitResponse;
+
+public class KDBRWireFormat implements WireFormat {
+
+ private int version;
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public ByteSequence marshal(Object command) throws IOException {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public Object unmarshal(ByteSequence packet) throws IOException {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public void marshal(Object command, DataOutput out) throws IOException {
+ OutputStream os = (OutputStream) out;
+ ReplicationFrame frame = (ReplicationFrame) command;
+ PBHeader header = frame.getHeader();
+ switch (frame.getHeader().getType()) {
+ case FILE_TRANSFER_RESPONSE: {
+ // Write the header..
+ header.writeFramed(os);
+ // Stream the Payload.
+ InputStream is = (InputStream) frame.getPayload();
+ byte data[] = new byte[1024 * 4];
+ int c;
+ long remaining = frame.getHeader().getPayloadSize();
+ while (remaining > 0 && (c = is.read(data, 0, (int) Math.min(remaining, data.length))) >= 0) {
+ os.write(data, 0, c);
+ remaining -= c;
+ }
+ break;
+ }
+ default:
+ if (frame.getPayload() == null) {
+ header.clearPayloadSize();
+ header.writeFramed(os);
+ } else {
+ // All other payloads types are PB messages
+ Message message = (Message) frame.getPayload();
+ header.setPayloadSize(message.serializedSizeUnframed());
+ header.writeFramed(os);
+ message.writeUnframed(os);
+ }
+ }
+ }
+
+ public Object unmarshal(DataInput in) throws IOException {
+ InputStream is = (InputStream) in;
+ ReplicationFrame frame = new ReplicationFrame();
+ frame.setHeader(PBHeader.parseFramed(is));
+ switch (frame.getHeader().getType()) {
+ case FILE_TRANSFER_RESPONSE:
+ frame.setPayload(is);
+ break;
+ case FILE_TRANSFER:
+ readPBPayload(frame, in, new PBFileInfo());
+ break;
+ case JOURNAL_UPDATE:
+ readPBPayload(frame, in, new PBJournalUpdate());
+ break;
+ case JOURNAL_UPDATE_ACK:
+ readPBPayload(frame, in, new PBJournalLocation());
+ break;
+ case SLAVE_INIT:
+ readPBPayload(frame, in, new PBSlaveInit());
+ break;
+ case SLAVE_INIT_RESPONSE:
+ readPBPayload(frame, in, new PBSlaveInitResponse());
+ break;
+ }
+ return frame;
+ }
+
+ private void readPBPayload(ReplicationFrame frame, DataInput in, Message pb) throws IOException, InvalidProtocolBufferException {
+ long payloadSize = frame.getHeader().getPayloadSize();
+ byte[] payload;
+ payload = new byte[(int)payloadSize];
+ in.readFully(payload);
+ frame.setPayload(pb.mergeUnframed(payload));
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/transport/KDBRWireFormatFactory.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.transport;
+
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ * @version $Revision: 712224 $
+ */
+public class KDBRWireFormatFactory implements WireFormatFactory {
+
+ public WireFormat createWireFormat() {
+ return new KDBRWireFormat();
+ }
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,520 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.zk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.ReplicationSupport;
+import org.apache.kahadb.replication.pb.PBClusterConfiguration;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ * @author chirino
+ * @org.apache.xbean.XBean element="zookeeperCluster"
+ */
+public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
+ private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);
+
+ final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+ private int startCounter;
+
+ private String uri = "zk://localhost:2181/activemq/ha-cluster/default";
+ String userid = "activemq";
+ String password = "";
+
+ private ZooKeeper zk;
+ private String path;
+
+ private ClusterState clusterState;
+ private String statusPath;
+ private PBClusterNodeStatus memberStatus;
+ private Thread takoverTask;
+ private boolean areWeTheBestMaster;
+
+ synchronized public void addListener(ClusterListener listener) {
+ listeners.add(listener);
+ fireClusterChange();
+ }
+
+ synchronized public void removeListener(ClusterListener listener) {
+ listeners.remove(listener);
+ }
+
+ synchronized private void updateClusterState(ClusterState clusterState) {
+ this.clusterState = clusterState;
+ fireClusterChange();
+ }
+
+ synchronized private void fireClusterChange() {
+ if (startCounter > 0 && !listeners.isEmpty()) {
+ for (ClusterListener listener : listeners) {
+ listener.onClusterChange(clusterState);
+ }
+ }
+ }
+
+ synchronized public void start() throws Exception {
+ startCounter++;
+ if (startCounter == 1) {
+
+ // Make sure the path is set..
+ String path = getPath();
+
+ // Create a ZooKeeper connection..
+ zk = createZooKeeperConnection();
+
+ while( isStarted() ) {
+ try {
+ mkParentDirs(path);
+ try {
+ zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (NodeExistsException ignore) {
+ }
+ processClusterStateChange();
+ return;
+ } catch (Exception e) {
+ handleZKError(e);
+ }
+ }
+ }
+ }
+
+ synchronized private boolean isStarted() {
+ return startCounter > 0;
+ }
+
+ synchronized public void stop() throws Exception {
+ startCounter--;
+ if (startCounter == 0) {
+ zk.close();
+ zk = null;
+ path=null;
+ clusterState=null;
+ statusPath=null;
+ memberStatus=null;
+ takoverTask=null;
+ areWeTheBestMaster=false;
+
+ }
+ }
+
+
+ public String getPath() {
+ if( path == null ) {
+ try {
+ URI uri = new URI(this.uri);
+ path = uri.getPath();
+ if (path == null) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "', path to cluster configuration not specified");
+ }
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "': "+e);
+ }
+ }
+ return path;
+ }
+
+ public ZooKeeper createZooKeeperConnection() throws URISyntaxException, IOException {
+ // Parse out the configuration URI.
+ URI uri = new URI(this.uri);
+ if (!uri.getScheme().equals("zk")) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "', expected it to start with zk://");
+ }
+ String host = uri.getHost();
+ if (host == null) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "', host not specified");
+ }
+ int port = uri.getPort();
+ if (port == -1) {
+ port = 2181;
+ }
+
+ ZooKeeper zk = new ZooKeeper(host, port, this);
+ zk.addAuthInfo("digest", (userid+":"+password).getBytes());
+ return zk;
+ }
+
+ private void processClusterStateChange() {
+ try {
+ if( zk==null ) {
+ return;
+ }
+
+ byte[] data = zk.getData(path, new Watcher() {
+ public void process(WatchedEvent event) {
+ processClusterStateChange();
+ }
+ }, new Stat());
+ PBClusterConfiguration config = new PBClusterConfiguration();
+ config.mergeUnframed(data);
+
+ ClusterState state = new ClusterState();
+ HashSet<String> slaves = new HashSet<String>(config.getMembersList());
+ if( config.hasMaster() ) {
+ state.setMaster(config.getMaster());
+ slaves.remove(config.getMaster());
+ }
+ state.setSlaves(new ArrayList<String>(slaves));
+ updateClusterState(state);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println("Got: " + event);
+ }
+
+ public void setMemberStatus(final PBClusterNodeStatus status) {
+ while( isStarted() ) {
+ try {
+
+ this.memberStatus = status;
+ if (statusPath == null) {
+ mkdirs(path + "/election");
+ statusPath = zk.create(path + "/election/n_", status.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ } else {
+ Stat stat = zk.exists(statusPath, false);
+ if (status == null) {
+ zk.delete(statusPath, stat.getVersion());
+ statusPath = null;
+ } else {
+ zk.setData(statusPath, status.toUnframedByteArray(), stat.getVersion());
+ }
+ }
+ processElectionChange();
+ return;
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ handleZKError(e);
+ }
+ }
+ }
+
+ synchronized private void processElectionChange() {
+ try {
+ if( zk==null ) {
+ return;
+ }
+ List<String> zkNodes = zk.getChildren(path + "/election", new Watcher() {
+ public void process(WatchedEvent event) {
+ processElectionChange();
+ }
+ });
+ Map<String, PBClusterNodeStatus> children = processNodeStatus(zkNodes);
+
+ if( children.isEmpty() ) {
+ return;
+ }
+ String firstNodeId = children.keySet().iterator().next();
+
+ // If we are the first child?
+ if( firstNodeId.equals(statusPath) ) {
+
+ // If we are master already no need to do anything else
+ if ( memberStatus.getConnectUri().equals(clusterState.getMaster()) ) {
+ return;
+ }
+
+ // We may need to wait till a little to figure out if we are
+ // actually the best pick to be the master.
+ switch (memberStatus.getState()) {
+ case MASTER:
+ case SLAVE_ONLINE:
+ // Can transition to master immediately
+ LOG.info("Online salve taking over as master.");
+ setMaster(memberStatus.getConnectUri());
+ return;
+
+ case SLAVE_SYNCRONIZING:
+ case SLAVE_UNCONNECTED:
+
+ // If it looks like we are the best master.. lets wait 5 secs to
+ // let other slaves
+ // join the cluster and get a chance to take over..
+ if (areWeTheBestMaster(children)) {
+
+ areWeTheBestMaster = true;
+ if( takoverTask==null ) {
+ LOG.info(memberStatus.getConnectUri()+" looks like the best offline slave that can take over as master.. waiting 5 secs to allow another slave to take over.");
+
+ takoverTask = new Thread("Slave takeover..") {
+ public void run() {
+ takoverAttempt();
+ }
+ };
+ takoverTask.setDaemon(true);
+ takoverTask.start();
+ }
+ return;
+
+ } else {
+ if( areWeTheBestMaster ) {
+ LOG.info(memberStatus.getConnectUri()+" no longer looks like the best offline slave that can take over as master.");
+ }
+
+ areWeTheBestMaster = false;
+
+ // If we get here we need to yield our top position in the node
+ // sequence list so that the better
+ // slave can become the master.
+ Stat stat = zk.exists(statusPath, false);
+ zk.delete(statusPath, stat.getVersion());
+ statusPath = zk.create(path + "/election/n_", memberStatus.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected void takoverAttempt() {
+ try {
+ for( int i=0; i < 10; i++ ) {
+ Thread.sleep(500);
+ if( !isStarted() )
+ return;
+ }
+
+ synchronized(this) {
+ try {
+ if( areWeTheBestMaster ) {
+ LOG.info(memberStatus.getConnectUri()+" is taking over as master.");
+ setMaster(memberStatus.getConnectUri());
+ }
+ } finally {
+ // We want to make sure we set takoverTask to null in the same mutex as we set the master.
+ takoverTask=null;
+ }
+ }
+ } catch (Exception e) {
+ } finally {
+ // sleep might error out..
+ synchronized(this) {
+ takoverTask=null;
+ }
+ }
+ }
+
+ private boolean areWeTheBestMaster(Map<String, PBClusterNodeStatus> children) {
+ Location ourLocation = ReplicationSupport.convert(memberStatus.getLastUpdate());
+ for (Entry<String, PBClusterNodeStatus> entry : children.entrySet()) {
+ PBClusterNodeStatus status = entry.getValue();
+ switch (status.getState()) {
+ case MASTER:
+ case SLAVE_ONLINE:
+ return false;
+
+ case SLAVE_SYNCRONIZING:
+ case SLAVE_UNCONNECTED:
+ if (ourLocation.compareTo(ReplicationSupport.convert(status.getLastUpdate())) < 0) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private Map<String, PBClusterNodeStatus> processNodeStatus(List<String> children) throws KeeperException, InterruptedException, InvalidProtocolBufferException {
+ java.util.TreeMap<String, PBClusterNodeStatus> rc = new java.util.TreeMap<String, PBClusterNodeStatus>();
+ for (String nodeId : children) {
+ try {
+ Stat stat = new Stat();
+ byte[] data = zk.getData(path + "/election/" + nodeId, false, stat);
+ PBClusterNodeStatus status = new PBClusterNodeStatus();
+ status.mergeUnframed(data);
+ rc.put(path + "/election/" + nodeId, status);
+ } catch (NoNodeException ignore) {
+ }
+ }
+ return rc;
+ }
+
+ public void addMember(final String node) {
+ while( isStarted() ) {
+ try {
+ mkParentDirs(path);
+ update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+ public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+ PBClusterConfiguration config = new PBClusterConfiguration();
+ if (data != null) {
+ config.mergeUnframed(data);
+ }
+ if (!config.getMembersList().contains(node)) {
+ config.addMembers(node);
+ }
+ return config.toFramedByteArray();
+ }
+ });
+ return;
+ } catch (Exception e) {
+ handleZKError(e);
+ }
+ }
+ }
+
+ public void removeMember(final String node) {
+ while( isStarted() ) {
+ try {
+ mkParentDirs(path);
+ update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+ public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+ PBClusterConfiguration config = new PBClusterConfiguration();
+ if (data != null) {
+ config.mergeUnframed(data);
+ }
+ config.getMembersList().remove(node);
+ return config.toFramedByteArray();
+ }
+ });
+ return;
+ } catch (Exception e) {
+ handleZKError(e);
+ }
+ }
+ }
+
+ public void setMaster(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+ mkParentDirs(path);
+ update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+ public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+ PBClusterConfiguration config = new PBClusterConfiguration();
+ if (data != null) {
+ config.mergeUnframed(data);
+ }
+ config.setMaster(node);
+ return config.toFramedByteArray();
+ }
+ });
+ }
+
+ interface Updater<T extends Throwable> {
+ byte[] update(byte[] data) throws T;
+ }
+
+ private <T extends Throwable> void update(String path, CreateMode persistent, Updater<T> updater) throws InvalidProtocolBufferException, KeeperException, InterruptedException, T {
+ Stat stat = zk.exists(path, false);
+ if (stat != null) {
+ byte[] data = zk.getData(path, false, stat);
+ data = updater.update(data);
+ zk.setData(path, data, stat.getVersion());
+ } else {
+ byte[] update = updater.update(null);
+ try {
+ zk.create(path, update, Ids.OPEN_ACL_UNSAFE, persistent);
+ } catch (NodeExistsException ignore) {
+ stat = zk.exists(path, false);
+ byte[] data = zk.getData(path, false, stat);
+ data = updater.update(data);
+ zk.setData(path, data, stat.getVersion());
+ }
+ }
+ }
+
+ private void mkParentDirs(String path) throws KeeperException, InterruptedException {
+ int lastIndexOf = path.lastIndexOf("/");
+ if (lastIndexOf >= 0) {
+ mkdirs(path.substring(0, lastIndexOf));
+ }
+ }
+
+ private void mkdirs(String path) throws KeeperException, InterruptedException {
+ if (zk.exists(path, false) != null) {
+ return;
+ }
+ // Remove the leading /
+ if (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ String[] split = path.split("/");
+ String cur = "";
+ for (String node : split) {
+ cur += "/" + node;
+ try {
+ zk.create(cur, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (NodeExistsException ignore) {
+ }
+ }
+ }
+
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getUserid() {
+ return userid;
+ }
+
+ public void setUserid(String userid) {
+ this.userid = userid;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ private void handleZKError(Exception e) {
+ LOG.warn("ZooKeeper error. Will retry operation in 1 seconds");
+ LOG.debug("The error was: "+e, e);
+
+ for( int i=0; i < 10; i ++) {
+ try {
+ if( !isStarted() )
+ return;
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ return;
+ }
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/proto/kahadb-replication.proto Fri Dec 11 19:39:58 2009
@@ -0,0 +1,147 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+package org.apache.kahadb.replication.pb;
+
+option java_multiple_files = true;
+option java_outer_classname = "PB";
+
+//
+//
+//
+message PBHeader {
+ required PBType type=1;
+ optional int64 payload_size=2;
+}
+
+enum PBType {
+
+ // Sent from the slave to the master when the slave first starts. It lets the master
+ // know about the slave's synchronization state. This allows the master decide how to best synchronize
+ // the slave.
+ //
+ // @followed-by PBSlaveInit
+ SLAVE_INIT = 0;
+
+ // The Master will send this response back to the slave, letting it know what it needs to do to get
+ // it's data files synchronized with the master.
+ //
+ // @followed-by PBSlaveInitResponse
+ SLAVE_INIT_RESPONSE = 1;
+
+ // The Slave will send this this command to the master once he has completed
+ // all his bulk synchronizations and he is ready to take over as being a master.
+ //
+ // @followed-by null
+ SLAVE_ONLINE=2;
+
+ // Sent from the Master to the slave to replicate a Journal update.
+ //
+ // @followed-by PBJournalUpdate
+ JOURNAL_UPDATE=3;
+
+ // An ack sent back to the master in response to to a received
+ // JOURNAL_UPDATE
+ //
+ // @followed-by PBJournalLocation
+ JOURNAL_UPDATE_ACK=4;
+
+ // A Request for a bulk file transfer. Sent from a slave to a Master
+ //
+ // @followed-by PBFileInfo
+ FILE_TRANSFER=5;
+
+ // A bulk file transfer response
+ //
+ // @followed-by the bytes of the requested file.
+ FILE_TRANSFER_RESPONSE=6;
+}
+
+message PBFileInfo {
+ required string name=1;
+ optional int32 snapshot_id=2;
+ optional sfixed64 checksum=3;
+ optional int64 start=4;
+ optional int64 end=5;
+}
+
+message PBJournalLocation {
+ required int32 file_id=1;
+ required int32 offset=2;
+}
+
+message PBSlaveInit {
+ // The id of the slave node that is being initialized
+ required string node_id=1;
+ // The files that the slave node currently has
+ repeated PBFileInfo current_files=2;
+}
+
+message PBSlaveInitResponse {
+ // The files that the slave should bulk copy from the master..
+ repeated PBFileInfo copy_files=1;
+ // The files that the slave should delete
+ repeated string delete_files=2;
+}
+
+message PBJournalUpdate {
+ // Journal location of the update.
+ required PBJournalLocation location=1;
+ // The data that will be written at that location.
+ required bytes data=2;
+ // Should the slave send back an ack for this update.
+ optional bool send_ack=3;
+ // If true, then the slave should do a disk sync before returning a
+ // JOURNAL_UPDATE_ACK
+ optional bool disk_sync=4;
+}
+
+//
+// This hold
+//
+message PBClusterConfiguration {
+ // Would be nice if the configuration of the broker was setup cluster wide. We could
+ // stuff the spring config in here.. That way pushing out changes to the rest of the
+ // cluster would be very easy.
+ optional bytes broker_configuration=1;
+ // Who are the nodes that have joined the cluster. They may not all be online.
+ // Comes in handy to see if there are enough online members to form a quorum.
+ repeated string members=2;
+ // Who was the last elected master.
+ optional string master=3;
+}
+
+message PBClusterNodeStatus {
+
+ enum State {
+ // When the slave initially starts up it
+ // is not connected to a master.
+ SLAVE_UNCONNECTED = 0;
+ // When the slave first attaches to a master, it must first
+ // synchronize with the master to get any data updates
+ // that were missed while he was offline.
+ SLAVE_SYNCRONIZING = 1;
+ // The slave is caught up and is only actively applying
+ // real time updates from the master.
+ SLAVE_ONLINE = 3;
+ // This node is the master.
+ MASTER = 4;
+ }
+
+ required State state=1;
+ optional string connect_uri=2;
+ optional PBJournalLocation last_update=3;
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha-broker.xml Fri Dec 11 19:39:58 2009
@@ -0,0 +1,153 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+ http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+ <!-- Allows us to use system properties as variables in this configuration file -->
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="locations">
+ <value>file:${activemq.base}/conf/credentials.properties</value>
+ </property>
+ </bean>
+
+ <broker xmlns="http://activemq.apache.org/schema/core"
+ start="false"
+ brokerName="localhost" dataDirectory="${activemq.base}/data">
+
+ <!-- Destination specific policies using destination names or wildcards -->
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue=">" memoryLimit="5mb"/>
+ <policyEntry topic=">" memoryLimit="5mb">
+ <!-- you can add other policies too such as these
+ <dispatchPolicy>
+ <strictOrderDispatchPolicy/>
+ </dispatchPolicy>
+ <subscriptionRecoveryPolicy>
+ <lastImageSubscriptionRecoveryPolicy/>
+ </subscriptionRecoveryPolicy>
+ -->
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+ <!-- Use the following to configure how ActiveMQ is exposed in JMX -->
+ <managementContext>
+ <managementContext createConnector="false"/>
+ </managementContext>
+
+ <!-- The store and forward broker networks ActiveMQ will listen to -->
+ <networkConnectors>
+ <!-- by default just auto discover the other brokers -->
+ <networkConnector name="default-nc" uri="multicast://default"/>
+ <!-- Example of a static configuration:
+ <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
+ -->
+ </networkConnectors>
+
+ <sslContext>
+ <sslContext keyStore="file:${activemq.base}/conf/broker.ks" keyStorePassword="password" trustStore="file:${activemq.base}/conf/broker.ts" trustStorePassword="password"/>
+ </sslContext>
+
+ <!-- The maximum about of space the broker will use before slowing down producers -->
+ <systemUsage>
+ <systemUsage>
+ <memoryUsage>
+ <memoryUsage limit="20 mb"/>
+ </memoryUsage>
+ <storeUsage>
+ <storeUsage limit="1 gb" name="foo"/>
+ </storeUsage>
+ <tempUsage>
+ <tempUsage limit="100 mb"/>
+ </tempUsage>
+ </systemUsage>
+ </systemUsage>
+
+ <!-- The transport connectors ActiveMQ will listen to -->
+ <transportConnectors>
+ <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
+ <transportConnector name="ssl" uri="ssl://localhost:61617"/>
+ <transportConnector name="stomp" uri="stomp://localhost:61613"/>
+ <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
+ </transportConnectors>
+
+ </broker>
+
+ <!--
+ ** Lets deploy some Enterprise Integration Patterns inside the ActiveMQ Message Broker
+ ** For more details see
+ **
+ ** http://activemq.apache.org/enterprise-integration-patterns.html
+ -->
+ <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+
+ <!-- You can use a <package> element for each root package to search for Java routes -->
+ <package>org.foo.bar</package>
+
+ <!-- You can use Spring XML syntax to define the routes here using the <route> element -->
+ <route>
+ <from uri="activemq:example.A"/>
+ <to uri="activemq:example.B"/>
+ </route>
+ </camelContext>
+
+ <!--
+ ** Lets configure some Camel endpoints
+ **
+ ** http://activemq.apache.org/camel/components.html
+ -->
+
+ <!-- configure the camel activemq component to use the current broker -->
+ <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
+ <property name="connectionFactory">
+ <bean class="org.apache.activemq.ActiveMQConnectionFactory">
+ <property name="brokerURL" value="vm://localhost?create=false&waitForStart=10000" />
+ <property name="userName" value="${activemq.username}"/>
+ <property name="password" value="${activemq.password}"/>
+ </bean>
+ </property>
+ </bean>
+
+ <!-- Uncomment to create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
+ <!--
+ <commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/>
+ -->
+
+ <!-- An embedded servlet engine for serving up the Admin console -->
+ <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
+ <connectors>
+ <nioConnector port="8161"/>
+ </connectors>
+
+ <handlers>
+ <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
+ <webAppContext contextPath="/demo" resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
+ <webAppContext contextPath="/fileserver" resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
+ </handlers>
+ </jetty>
+
+</beans>
+<!-- END SNIPPET: example -->
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/release/conf/ha.xml Fri Dec 11 19:39:58 2009
@@ -0,0 +1,53 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:kdb="http://activemq.apache.org/schema/kahadb"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activ emq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+ http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/kahadb http://activemq.apache.org/schema/kahadb/kahadb.xsd">
+
+ <!-- Allows us to use system properties as variables in this configuration file -->
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="locations">
+ <value>file:${activemq.base}/conf/credentials.properties</value>
+ </property>
+ </bean>
+
+ <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
+ <replicationService>
+ <kahadbReplication
+ directory="${activemq.base}/data/kahadb"
+ brokerURI="xbean:ha-broker.xml"
+ uri="kdbr://localhost:60001"
+ minimumReplicas="1">
+
+ <cluster>
+ <zookeeperCluster uri="zk://localhost:2181/activemq/default-ha-group" userid="activemq" password=""/>
+ </cluster>
+
+ </kahadbReplication>
+ </replicationService>
+ </kahadbReplicationBroker>
+
+</beans>
+<!-- END SNIPPET: example -->
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/transport/kdbr Fri Dec 11 19:39:58 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRTransportFactory
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/main/resources/META-INF/services/org/apache/activemq/wireformat/kdbr Fri Dec 11 19:39:58 2009
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.kahadb.replication.transport.KDBRWireFormatFactory
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/eclipse-resources/log4j.properties Fri Dec 11 19:39:58 2009
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, out
+
+
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
\ No newline at end of file
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/ReplicationTest.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.util.Arrays;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ReplicationTest extends TestCase {
+
+
+ private static final String BROKER1_URI = "tcp://localhost:61001";
+ private static final String BROKER2_URI = "tcp://localhost:61002";
+
+ private static final String BROKER1_REPLICATION_ID = "kdbr://localhost:60001";
+ private static final String BROKER2_REPLICATION_ID = "kdbr://localhost:60002";
+
+ private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+
+ public void testReplication() throws Exception {
+
+ // This cluster object will control who becomes the master.
+ StaticClusterStateManager cluster = new StaticClusterStateManager();
+
+ ReplicationService rs1 = new ReplicationService();
+ rs1.setMinimumReplicas(0);
+ rs1.setUri(BROKER1_REPLICATION_ID);
+ rs1.setCluster(cluster);
+ rs1.setDirectory(new File("target/replication-test/broker1"));
+ rs1.setBrokerURI("broker://("+BROKER1_URI+")/broker1");
+ rs1.start();
+
+ ReplicationService rs2 = new ReplicationService();
+ rs2.setMinimumReplicas(0);
+ rs2.setUri(BROKER2_REPLICATION_ID);
+ rs2.setCluster(cluster);
+ rs2.setDirectory(new File("target/replication-test/broker2"));
+ rs2.setBrokerURI("broker://(" + BROKER2_URI + ")/broker2");
+ rs2.start();
+
+// // None of the brokers should be accepting connections since they are not masters.
+// try {
+// sendMesagesTo(1, BROKER1_URI);
+// fail("Connection failure expected.");
+// } catch( JMSException e ) {
+// }
+
+ // Make b1 the master.
+ ClusterState clusterState = new ClusterState();
+ clusterState.setMaster(BROKER1_REPLICATION_ID);
+ cluster.setClusterState(clusterState);
+
+ try {
+ sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
+ } catch( JMSException e ) {
+ fail("b1 did not become a master.");
+ }
+
+ // Make broker 2 a salve.
+ clusterState = new ClusterState();
+ clusterState.setMaster(BROKER1_REPLICATION_ID);
+ String[] slaves = {BROKER2_REPLICATION_ID};
+ clusterState.setSlaves(Arrays.asList(slaves));
+ cluster.setClusterState(clusterState);
+
+ Thread.sleep(1000);
+
+
+ try {
+ sendMesagesTo(BROKER1_URI, 100, "Pass 2: ");
+ } catch( JMSException e ) {
+ fail("Failed to send more messages...");
+ }
+
+ Thread.sleep(2000);
+
+ // Make broker 2 the master.
+ clusterState = new ClusterState();
+ clusterState.setMaster(BROKER2_REPLICATION_ID);
+ cluster.setClusterState(clusterState);
+
+ Thread.sleep(1000);
+
+ assertReceived(200, BROKER2_URI);
+
+ rs2.stop();
+ rs1.stop();
+
+ }
+
+ private void assertReceived(int count, String brokerUri) throws JMSException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+ Connection con = cf.createConnection();
+ con.start();
+ try {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+ for (int i = 0; i < count; i++) {
+ TextMessage m = (TextMessage) consumer.receive(1000);
+ if( m==null ) {
+ fail("Failed to receive message: "+i);
+ }
+ System.out.println("Got: "+m.getText());
+ }
+ } finally {
+ try { con.close(); } catch (Throwable e) {}
+ }
+ }
+
+ private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+ Connection con = cf.createConnection();
+ try {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < count; i++) {
+ producer.send(session.createTextMessage(msg+i));
+ }
+ } finally {
+ try { con.close(); } catch (Throwable e) {}
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/StaticClusterStateManager.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.util.ArrayList;
+
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+
+public class StaticClusterStateManager implements ClusterStateManager {
+
+ final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+ private ClusterState clusterState;
+ private int startCounter;
+
+ synchronized public ClusterState getClusterState() {
+ return clusterState;
+ }
+
+ synchronized public void setClusterState(ClusterState clusterState) {
+ this.clusterState = clusterState;
+ fireClusterChange();
+ }
+
+ synchronized public void addListener(ClusterListener listener) {
+ listeners.add(listener);
+ fireClusterChange();
+ }
+
+ synchronized public void removeListener(ClusterListener listener) {
+ listeners.remove(listener);
+ }
+
+ synchronized public void start() throws Exception {
+ startCounter++;
+ fireClusterChange();
+ }
+
+ synchronized private void fireClusterChange() {
+ if( startCounter>0 && !listeners.isEmpty() && clusterState!=null ) {
+ for (ClusterListener listener : listeners) {
+ listener.onClusterChange(clusterState);
+ }
+ }
+ }
+
+ synchronized public void stop() throws Exception {
+ startCounter--;
+ }
+
+ public void addMember(String node) {
+ }
+
+ public void removeMember(String node) {
+ }
+
+ public void setMemberStatus(PBClusterNodeStatus status) {
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/XBeanReplicationTest.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.kahadb.replication.zk.ZooKeeperClusterStateManager;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+public class XBeanReplicationTest extends TestCase {
+
+
+ private static final String BROKER1_URI = "tcp://localhost:61616";
+ private static final String BROKER2_URI = "tcp://localhost:61617";
+
+ private Destination destination = new ActiveMQQueue("TEST_QUEUE");
+
+ private static final int PORT = 2181;
+ private Factory serverFactory;
+
+ public void testReplication() throws Exception {
+
+ startZooKeeper();
+
+ BrokerService broker1 = BrokerFactory.createBroker("xbean:broker1/ha.xml");
+ broker1.start();
+
+ // Wait for the broker to get setup..
+ Thread.sleep(7000);
+
+ sendMesagesTo(BROKER1_URI, 100, "Pass 1: ");
+
+ // Create broker 2 which will join in and sync up with the existing master.
+ BrokerService broker2 = BrokerFactory.createBroker("xbean:broker2/ha.xml");
+ broker2.start();
+
+ // Give it some time to sync up..
+ Thread.sleep(1000);
+
+ // Stopping broker1 should make broker2 the master.
+ broker1.stop();
+
+ Thread.sleep(1000);
+
+ // Did all the messages get synced up?
+ assertReceived(100, BROKER2_URI);
+ // Send some more message...
+ sendMesagesTo(BROKER2_URI, 50, "Pass 2: ");
+
+ // Start broker1 up again.. it should re-sync with master 2
+ broker1.start();
+ // Give it some time to sync up..
+ Thread.sleep(1000);
+
+ // stopping the master..
+ broker2.stop();
+
+ // Did the new state get synced right?
+ assertReceived(50, BROKER1_URI);
+
+ broker1.stop();
+
+ stopZooKeeper();
+
+ }
+
+ private void stopZooKeeper() {
+ serverFactory.shutdown();
+ ServerStats.unregister();
+ }
+
+ private void startZooKeeper() throws IOException, InterruptedException, URISyntaxException {
+ ServerStats.registerAsConcrete();
+ File zooKeeperData = new File("target/test-data/zookeeper-"+System.currentTimeMillis());
+ zooKeeperData.mkdirs();
+
+ // Reduces startup time..
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100);
+ ZooKeeperServer zs = new ZooKeeperServer(zooKeeperData, zooKeeperData, 3000);
+
+ serverFactory = new NIOServerCnxn.Factory(PORT);
+ serverFactory.startup(zs);
+
+ ZooKeeperClusterStateManager zkcsm1 = new ZooKeeperClusterStateManager();
+ zkcsm1.setUri("zk://localhost:"+PORT+"/");
+ ZooKeeper zk = zkcsm1.createZooKeeperConnection();
+
+ // Wait till the ZK client gets connected..
+ States state;
+ while( (state = zk.getState()) != States.CONNECTED ) {
+ Thread.sleep(100);
+ }
+ zk.close();
+
+ }
+
+ private void assertReceived(int count, String brokerUri) throws JMSException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+ Connection con = cf.createConnection();
+ con.start();
+ try {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+ for (int i = 0; i < count; i++) {
+ TextMessage m = (TextMessage) consumer.receive(1000);
+ if( m==null ) {
+ fail("Failed to receive message: "+i);
+ }
+ System.out.println("Got: "+m.getText());
+ }
+ } finally {
+ try { con.close(); } catch (Throwable e) {}
+ }
+ }
+
+ private void sendMesagesTo(String brokerUri, int count, String msg) throws JMSException {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri);
+ Connection con = cf.createConnection();
+ try {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < count; i++) {
+ producer.send(session.createTextMessage(msg+i));
+ }
+ } finally {
+ try { con.close(); } catch (Throwable e) {}
+ }
+ }
+
+}
Added: activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java?rev=889781&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-kahadb-replication/src/test/java/org/apache/kahadb/replication/blaze/BlazeClusterStateManagerTest.java Fri Dec 11 19:39:58 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.blaze;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.apache.activemq.util.Callback;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+
+/**
+ * @author rajdavies
+ *
+ */
+public class BlazeClusterStateManagerTest extends TestCase {
+ public void testTwoNodesGoingOnline() throws Exception {
+ final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+ final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
+ final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+ bcsm1.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents1.add(config);
+ }
+ });
+ bcsm1.start();
+ bcsm1.addMember("kdbr://localhost:60001");
+ final BlazeClusterStateManager bcsm2 = new BlazeClusterStateManager();
+ bcsm2.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents2.add(config);
+ }
+ });
+ bcsm2.start();
+ bcsm2.addMember("kdbr://localhost:60002");
+ // Drain the events..
+ while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+ }
+ while (stateEvents2.poll(100, TimeUnit.MILLISECONDS) != null) {
+ }
+ // Bring node 1 online
+ final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
+ status1.setConnectUri("kdbr://localhost:60001");
+ status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
+ status1.setState(State.SLAVE_UNCONNECTED);
+ executeAsync(new Callback() {
+ public void execute() throws Exception {
+ bcsm1.setMemberStatus(status1);
+ }
+ });
+ // Bring node 2 online
+ final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
+ status2.setConnectUri("kdbr://localhost:60002");
+ status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
+ status2.setState(State.SLAVE_UNCONNECTED);
+ executeAsync(new Callback() {
+ public void execute() throws Exception {
+ Thread.sleep(1000);
+ bcsm2.setMemberStatus(status2);
+ }
+ });
+ ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60002", state.getMaster());
+ assertTrue(state.getSlaves().size() == 1);
+ state = stateEvents2.poll(2, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60002", state.getMaster());
+ assertTrue(state.getSlaves().size() == 1);
+ bcsm2.stop();
+ bcsm1.stop();
+ }
+
+ public void testOneNodeGoingOnline() throws Exception {
+ final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+ final BlazeClusterStateManager bcsm1 = new BlazeClusterStateManager();
+ bcsm1.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents1.add(config);
+ }
+ });
+ bcsm1.start();
+ // Drain the events..
+ while (stateEvents1.poll(100, TimeUnit.MILLISECONDS) != null) {
+ }
+ // Let node1 join the cluster.
+ bcsm1.addMember("kdbr://localhost:60001");
+ ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNull(state.getMaster());
+ assertTrue(state.getSlaves().size() == 1);
+ // Let the cluster know that node1 is online..
+ PBClusterNodeStatus status = new PBClusterNodeStatus();
+ status.setConnectUri("kdbr://localhost:60001");
+ status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
+ status.setState(State.SLAVE_UNCONNECTED);
+ bcsm1.setMemberStatus(status);
+ state = stateEvents1.poll(10, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60001", state.getMaster());
+ assertTrue(state.getSlaves().isEmpty());
+ bcsm1.stop();
+ }
+
+ private void executeAsync(final Callback callback) {
+ new Thread("Async Test Task") {
+ @Override
+ public void run() {
+ try {
+ callback.execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+}