You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/18 01:53:32 UTC
svn commit: r718453 - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/replication/
main/java/org/apache/kahadb/replication/zk/ main/proto/
test/java/org/apache/kahadb/replication/zk/ test/resources/
Author: chirino
Date: Mon Nov 17 16:53:32 2008
New Revision: 718453
URL: http://svn.apache.org/viewvc?rev=718453&view=rev
Log:
Adding an initial pass of a ZooKeeper based cluster manager.
Added:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
activemq/sandbox/kahadb/src/test/resources/log4j.properties
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Mon Nov 17 16:53:32 2008
@@ -18,7 +18,6 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -27,8 +26,6 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.zip.Adler32;
-import java.util.zip.Checksum;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
@@ -119,7 +116,7 @@
ReplicationFrame frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
PBJournalUpdate payload = new PBJournalUpdate();
- payload.setLocation(convert(location));
+ payload.setLocation(ReplicationSupport.convert(location));
payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
frame.setPayload(payload);
@@ -285,7 +282,7 @@
info.setSnapshotId(snapshotId);
info.setStart(0);
info.setEnd(file.length());
- info.setChecksum(copyAndChecksum(file, snapshotFile));
+ info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
snapshotInfos.add(info);
rcPayload.setCopyFilesList(snapshotInfos);
@@ -345,43 +342,6 @@
}
}
- private PBJournalLocation convert(Location loc) {
- if( loc==null ) {
- return null;
- }
- return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
- }
-
- private long copyAndChecksum(File input, File output) throws IOException {
- FileInputStream is = null;
- FileOutputStream os = null;
- try {
- is = new FileInputStream(input);
- os = new FileOutputStream(output);
-
- byte buffer[] = new byte[1024 * 4];
- int c;
-
- Checksum checksum = new Adler32();
- while ((c = is.read(buffer)) >= 0) {
- os.write(buffer, 0, c);
- checksum.update(buffer, 0, c);
- }
- return checksum.getValue();
-
- } finally {
- try {
- is.close();
- } catch(Throwable e) {
- }
- try {
- os.close();
- } catch(Throwable e) {
- }
- }
- }
-
-
private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation journalLocation) {
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSlave.java Mon Nov 17 16:53:32 2008
@@ -40,7 +40,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.DataFile;
-import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.replication.pb.PBFileInfo;
import org.apache.kahadb.replication.pb.PBHeader;
@@ -293,18 +292,12 @@
if( onlineRecovery ) {
KahaDBStore store = replicationServer.getStore();
// Let the journal know that we appended to one of it's files..
- store.getJournal().appendedExternally(convert(location), data.length);
+ store.getJournal().appendedExternally(ReplicationSupport.convert(location), data.length);
// Now incrementally recover those records.
store.incrementalRecover();
}
}
- private Location convert(PBJournalLocation location) {
- Location rc = new Location();
- rc.setDataFileId(location.getFileId());
- rc.setOffset(location.getOffset());
- return rc;
- }
private void commitBulkTransfer() {
try {
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java?rev=718453&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationSupport.java Mon Nov 17 16:53:32 2008
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+
+public class ReplicationSupport {
+
+ static public PBJournalLocation convert(Location loc) {
+ if( loc==null ) {
+ return null;
+ }
+ return new PBJournalLocation().setFileId(loc.getDataFileId()).setOffset(loc.getOffset());
+ }
+
+ static public Location convert(PBJournalLocation location) {
+ Location rc = new Location();
+ rc.setDataFileId(location.getFileId());
+ rc.setOffset(location.getOffset());
+ return rc;
+ }
+
+
+ static public long copyAndChecksum(File input, File output) throws IOException {
+ FileInputStream is = null;
+ FileOutputStream os = null;
+ try {
+ is = new FileInputStream(input);
+ os = new FileOutputStream(output);
+
+ byte buffer[] = new byte[1024 * 4];
+ int c;
+
+ Checksum checksum = new Adler32();
+ while ((c = is.read(buffer)) >= 0) {
+ os.write(buffer, 0, c);
+ checksum.update(buffer, 0, c);
+ }
+ return checksum.getValue();
+
+ } finally {
+ try {
+ is.close();
+ } catch(Throwable e) {
+ }
+ try {
+ os.close();
+ } catch(Throwable e) {
+ }
+ }
+ }
+
+
+
+}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=718453&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java Mon Nov 17 16:53:32 2008
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.zk;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.ClusterStateManager;
+import org.apache.kahadb.replication.ReplicationSupport;
+import org.apache.kahadb.replication.pb.PBClusterConfiguration;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
+ private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);
+
+ final private ArrayList<ClusterListener> listeners = new ArrayList<ClusterListener>();
+ private int startCounter;
+
+ private String uri = "zk://localhost:2181/activemq/ha-cluster/default";
+ String userid = "activemq";
+ String password = "";
+
+ private ZooKeeper zk;
+ private String path;
+
+ ClusterState clusterState;
+ private String statusPath;
+ private PBClusterNodeStatus memberStatus;
+
+ private Thread takoverTask;
+
+ private boolean areWeTheBestMaster;
+
+ synchronized public void addListener(ClusterListener listener) {
+ listeners.add(listener);
+ fireClusterChange();
+ }
+
+ synchronized public void removeListener(ClusterListener listener) {
+ listeners.remove(listener);
+ }
+
+ synchronized private void updateClusterState(ClusterState clusterState) {
+ this.clusterState = clusterState;
+ fireClusterChange();
+ }
+
+ synchronized private void fireClusterChange() {
+ if (startCounter > 0 && !listeners.isEmpty()) {
+ for (ClusterListener listener : listeners) {
+ listener.onClusterChange(clusterState);
+ }
+ }
+ }
+
+ synchronized public void start() throws Exception {
+ startCounter++;
+ if (startCounter == 1) {
+
+ // Make sure the path is set..
+ String path = getPath();
+
+ // Create a ZooKeeper connection..
+ zk = createZooKeeperConnection();
+
+ mkParentDirs(path);
+ try {
+ zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (NodeExistsException ignore) {
+ }
+
+ processClusterStateChange();
+ }
+ }
+
+ public String getPath() {
+ if( path == null ) {
+ try {
+ URI uri = new URI(this.uri);
+ path = uri.getPath();
+ if (path == null) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "', path to cluster configuration not specified");
+ }
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "': "+e);
+ }
+ }
+ return path;
+ }
+
+ ZooKeeper createZooKeeperConnection() throws URISyntaxException, IOException {
+ // Parse out the configuration URI.
+ URI uri = new URI(this.uri);
+ if (!uri.getScheme().equals("zk")) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "', expected it to start with zk://");
+ }
+ String host = uri.getHost();
+ if (host == null) {
+ throw new IllegalArgumentException("Invalid uri '" + uri + "', host not specified");
+ }
+ int port = uri.getPort();
+ if (port == -1) {
+ port = 2181;
+ }
+
+ ZooKeeper zk = new ZooKeeper(host, port, this);
+ zk.addAuthInfo("digest", (userid+":"+password).getBytes());
+ return zk;
+ }
+
+ private void processClusterStateChange() {
+ try {
+ if( zk==null ) {
+ return;
+ }
+
+ byte[] data = zk.getData(path, new Watcher() {
+ public void process(WatchedEvent event) {
+ processClusterStateChange();
+ }
+ }, new Stat());
+ PBClusterConfiguration config = new PBClusterConfiguration();
+ config.mergeUnframed(data);
+
+ ClusterState state = new ClusterState();
+ HashSet<String> slaves = new HashSet<String>(config.getMembersList());
+ if( config.hasMaster() ) {
+ state.setMaster(config.getMaster());
+ slaves.remove(config.getMaster());
+ }
+ state.setSlaves(new ArrayList<String>(slaves));
+ updateClusterState(state);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ synchronized public void stop() throws Exception {
+ startCounter--;
+ if (startCounter == 0) {
+ zk.close();
+ zk = null;
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println("Got: " + event);
+ }
+
+ public void setMemberStatus(final PBClusterNodeStatus status) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+ this.memberStatus = status;
+ if (statusPath == null) {
+ mkdirs(path + "/election");
+ statusPath = zk.create(path + "/election/n_", status.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ } else {
+ Stat stat = zk.exists(statusPath, false);
+ if (status == null) {
+ zk.delete(statusPath, stat.getVersion());
+ statusPath = null;
+ } else {
+ zk.setData(statusPath, status.toUnframedByteArray(), stat.getVersion());
+ }
+ }
+ processElectionChange();
+ }
+
+ synchronized private void processElectionChange() {
+ try {
+ if( zk==null ) {
+ return;
+ }
+ List<String> zkNodes = zk.getChildren(path + "/election", new Watcher() {
+ public void process(WatchedEvent event) {
+ processElectionChange();
+ }
+ });
+ Map<String, PBClusterNodeStatus> children = processNodeStatus(zkNodes);
+
+ if( children.isEmpty() ) {
+ return;
+ }
+ String firstNodeId = children.keySet().iterator().next();
+
+ // If we are the first child?
+ if( firstNodeId.equals(statusPath) ) {
+
+ // If we are master already no need to do anything else
+ if ( memberStatus.getConnectUri().equals(clusterState.getMaster()) ) {
+ return;
+ }
+
+ // We may need to wait till a little to figure out if we are
+ // actually the best pick to be the master.
+ switch (memberStatus.getState()) {
+ case MASTER:
+ case SLAVE_ONLINE:
+ // Can transition to master immediately
+ LOG.info("Online salve taking over as master.");
+ setMaster(memberStatus.getConnectUri());
+ return;
+
+ case SLAVE_SYNCRONIZING:
+ case SLAVE_UNCONNECTED:
+
+ // If it looks like we are the best master.. lets wait 5 secs to
+ // let other slaves
+ // join the cluster and get a chance to take over..
+ if (areWeTheBestMaster(children)) {
+
+ areWeTheBestMaster = true;
+ if( takoverTask==null ) {
+ LOG.info(memberStatus.getConnectUri()+" looks like the best offline slave that can take over as master.. waiting 5 secs to allow another slave to take over.");
+
+ takoverTask = new Thread("Slave takeover..") {
+ public void run() {
+ takoverAttempt();
+ }
+ };
+ takoverTask.setDaemon(true);
+ takoverTask.start();
+ }
+ return;
+
+ } else {
+ if( areWeTheBestMaster ) {
+ LOG.info(memberStatus.getConnectUri()+" no longer looks like the best offline slave that can take over as master.");
+ }
+
+ areWeTheBestMaster = false;
+
+ // If we get here we need to yield our top position in the node
+ // sequence list so that the better
+ // slave can become the master.
+ Stat stat = zk.exists(statusPath, false);
+ zk.delete(statusPath, stat.getVersion());
+ statusPath = zk.create(path + "/election/n_", memberStatus.toUnframedByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected void takoverAttempt() {
+ try {
+ Thread.sleep(5 * 1000);
+ synchronized(this) {
+ try {
+ if( areWeTheBestMaster ) {
+ LOG.info(memberStatus.getConnectUri()+" is taking over as master.");
+ setMaster(memberStatus.getConnectUri());
+ }
+ } finally {
+ // We want to make sure we set takoverTask to null in the same mutex as we set the master.
+ takoverTask=null;
+ }
+ }
+ } catch (Exception e) {
+ } finally {
+ // sleep might error out..
+ synchronized(this) {
+ takoverTask=null;
+ }
+ }
+ }
+
+ private boolean areWeTheBestMaster(Map<String, PBClusterNodeStatus> children) {
+ Location ourLocation = ReplicationSupport.convert(memberStatus.getLastUpdate());
+ for (Entry<String, PBClusterNodeStatus> entry : children.entrySet()) {
+ PBClusterNodeStatus status = entry.getValue();
+ switch (status.getState()) {
+ case MASTER:
+ case SLAVE_ONLINE:
+ return false;
+
+ case SLAVE_SYNCRONIZING:
+ case SLAVE_UNCONNECTED:
+ if (ourLocation.compareTo(ReplicationSupport.convert(status.getLastUpdate())) < 0) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private Map<String, PBClusterNodeStatus> processNodeStatus(List<String> children) throws KeeperException, InterruptedException, InvalidProtocolBufferException {
+ java.util.TreeMap<String, PBClusterNodeStatus> rc = new java.util.TreeMap<String, PBClusterNodeStatus>();
+ for (String nodeId : children) {
+ try {
+ Stat stat = new Stat();
+ byte[] data = zk.getData(path + "/election/" + nodeId, false, stat);
+ PBClusterNodeStatus status = new PBClusterNodeStatus();
+ status.mergeUnframed(data);
+ rc.put(path + "/election/" + nodeId, status);
+ } catch (NoNodeException ignore) {
+ }
+ }
+ return rc;
+ }
+
+ public void addMember(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+ mkParentDirs(path);
+ update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+ public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+ PBClusterConfiguration config = new PBClusterConfiguration();
+ if (data != null) {
+ config.mergeUnframed(data);
+ }
+ if (!config.getMembersList().contains(node)) {
+ config.addMembers(node);
+ }
+ return config.toFramedByteArray();
+ }
+ });
+ }
+
+ public void setMaster(final String node) throws InvalidProtocolBufferException, KeeperException, InterruptedException {
+ mkParentDirs(path);
+ update(path, CreateMode.PERSISTENT, new Updater<InvalidProtocolBufferException>() {
+ public byte[] update(byte[] data) throws InvalidProtocolBufferException {
+ PBClusterConfiguration config = new PBClusterConfiguration();
+ if (data != null) {
+ config.mergeUnframed(data);
+ }
+ config.setMaster(node);
+ return config.toFramedByteArray();
+ }
+ });
+ }
+
+ interface Updater<T extends Throwable> {
+ byte[] update(byte[] data) throws T;
+ }
+
+ private <T extends Throwable> void update(String path, CreateMode persistent, Updater<T> updater) throws InvalidProtocolBufferException, KeeperException, InterruptedException, T {
+ Stat stat = zk.exists(path, false);
+ if (stat != null) {
+ byte[] data = zk.getData(path, false, stat);
+ data = updater.update(data);
+ zk.setData(path, data, stat.getVersion());
+ } else {
+ byte[] update = updater.update(null);
+ try {
+ zk.create(path, update, Ids.OPEN_ACL_UNSAFE, persistent);
+ } catch (NodeExistsException ignore) {
+ stat = zk.exists(path, false);
+ byte[] data = zk.getData(path, false, stat);
+ data = updater.update(data);
+ zk.setData(path, data, stat.getVersion());
+ }
+ }
+ }
+
+ private void mkParentDirs(String path) throws KeeperException, InterruptedException {
+ int lastIndexOf = path.lastIndexOf("/");
+ if (lastIndexOf >= 0) {
+ mkdirs(path.substring(0, lastIndexOf));
+ }
+ }
+
+ private void mkdirs(String path) throws KeeperException, InterruptedException {
+ if (zk.exists(path, false) != null) {
+ return;
+ }
+ // Remove the leading /
+ if (path.startsWith("/")) {
+ path = path.substring(1);
+ }
+ String[] split = path.split("/");
+ String cur = "";
+ for (String node : split) {
+ cur += "/" + node;
+ try {
+ zk.create(cur, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (NodeExistsException ignore) {
+ }
+ }
+ }
+
+}
Modified: activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto (original)
+++ activemq/sandbox/kahadb/src/main/proto/kahadb-replication.proto Mon Nov 17 16:53:32 2008
@@ -96,6 +96,7 @@
// The files that the slave should delete
repeated string delete_files=2;
}
+
message PBJournalUpdate {
// Journal location of the update.
required PBJournalLocation location=1;
@@ -108,3 +109,39 @@
optional bool disk_sync=4;
}
+//
+// This hold
+//
+message PBClusterConfiguration {
+ // Would be nice if the configuration of the broker was setup cluster wide. We could
+ // stuff the spring config in here.. That way pushing out changes to the rest of the
+ // cluster would be very easy.
+ optional bytes broker_configuration=1;
+ // Who are the nodes that have joined the cluster. They may not all be online.
+ // Comes in handy to see if there are enough online members to form a quorum.
+ repeated string members=2;
+ // Who was the last elected master.
+ optional string master=3;
+}
+
+message PBClusterNodeStatus {
+
+ enum State {
+ // When the slave initially starts up it
+ // is not connected to a master.
+ SLAVE_UNCONNECTED = 0;
+ // When the slave first attaches to a master, it must first
+ // synchronize with the master to get any data updates
+ // that were missed while he was offline.
+ SLAVE_SYNCRONIZING = 1;
+ // The slave is caught up and is only actively applying
+ // real time updates from the master.
+ SLAVE_ONLINE = 3;
+ // This node is the master.
+ MASTER = 4;
+ }
+
+ required State state=1;
+ optional string connect_uri=2;
+ optional PBJournalLocation last_update=3;
+}
Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java?rev=718453&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManagerTest.java Mon Nov 17 16:53:32 2008
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.replication.zk;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.util.Callback;
+import org.apache.kahadb.replication.ClusterListener;
+import org.apache.kahadb.replication.ClusterState;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus;
+import org.apache.kahadb.replication.pb.PBJournalLocation;
+import org.apache.kahadb.replication.pb.PBClusterNodeStatus.State;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+
+public class ZooKeeperClusterStateManagerTest extends TestCase {
+
+ private static final int PORT = 2181;
+ private ZooKeeperClusterStateManager zkcsm1;
+ private ZooKeeper zk;
+ private Factory serverFactory;
+
+ public static boolean waitForServerUp(String host, int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = new Socket(host, port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ reader =
+ new BufferedReader(
+ new InputStreamReader(sock.getInputStream()));
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ } catch (IOException e) {
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+
+ @Override
+ protected void setUp() throws Exception {
+
+ ServerStats.registerAsConcrete();
+
+ File tmpDir = new File("target/test-data/zookeeper");
+ tmpDir.mkdirs();
+
+ // Reduces startup time..
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100);
+
+ ZooKeeperServer zs = new ZooKeeperServer(tmpDir, tmpDir, 3000);
+
+ serverFactory = new NIOServerCnxn.Factory(PORT);
+ serverFactory.startup(zs);
+
+// assertTrue("waiting for server up", waitForServerUp("localhost", PORT, 1000*5));
+
+ zkcsm1 = new ZooKeeperClusterStateManager();
+ zk = zkcsm1.createZooKeeperConnection();
+ // Cleanup after previous run...
+ zkRecusiveDelete(zkcsm1.getPath());
+ }
+
+ private void zkRecusiveDelete(String path) throws KeeperException, InterruptedException {
+ Stat stat = zk.exists(path, false);
+ if( stat!=null ) {
+ if( stat.getNumChildren() > 0 ) {
+ List<String> children = zk.getChildren(path, false);
+ for (String node : children) {
+ zkRecusiveDelete(path+"/"+node);
+ }
+ }
+ zk.delete(path, stat.getVersion());
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ zk.close();
+ serverFactory.shutdown();
+ ServerStats.unregister();
+ }
+
+ public void testTwoNodesGoingOnline() throws Exception {
+ final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+ final LinkedBlockingQueue<ClusterState> stateEvents2 = new LinkedBlockingQueue<ClusterState>();
+
+ zkcsm1.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents1.add(config);
+ }
+ });
+ zkcsm1.start();
+ zkcsm1.addMember("kdbr://localhost:60001");
+
+ final ZooKeeperClusterStateManager zkcsm2 = new ZooKeeperClusterStateManager();
+ zkcsm2.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents2.add(config);
+ }
+ });
+ zkcsm2.start();
+ zkcsm2.addMember("kdbr://localhost:60002");
+
+ // Drain the events..
+ while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) {
+ }
+ while( stateEvents2.poll(100, TimeUnit.MILLISECONDS)!=null ) {
+ }
+
+ // Bring node 1 online
+ final PBClusterNodeStatus status1 = new PBClusterNodeStatus();
+ status1.setConnectUri("kdbr://localhost:60001");
+ status1.setLastUpdate(new PBJournalLocation().setFileId(1).setOffset(50));
+ status1.setState(State.SLAVE_UNCONNECTED);
+
+ executeAsync(new Callback() {
+ public void execute() throws Exception {
+ zkcsm1.setMemberStatus(status1);
+ }
+ });
+
+ // Bring node 2 online
+ final PBClusterNodeStatus status2 = new PBClusterNodeStatus();
+ status2.setConnectUri("kdbr://localhost:60002");
+ status2.setLastUpdate(new PBJournalLocation().setFileId(2).setOffset(20));
+ status2.setState(State.SLAVE_UNCONNECTED);
+
+ executeAsync(new Callback() {
+ public void execute() throws Exception {
+ Thread.sleep(1000);
+ zkcsm2.setMemberStatus(status2);
+ }
+ });
+
+ ClusterState state = stateEvents1.poll(10, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60002", state.getMaster());
+ assertTrue(state.getSlaves().size()==1);
+
+ state = stateEvents2.poll(1, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60002", state.getMaster());
+ assertTrue(state.getSlaves().size()==1);
+
+ zkcsm2.stop();
+ zkcsm1.stop();
+ }
+
+ private void executeAsync(final Callback callback) {
+ new Thread("Async Test Task") {
+ @Override
+ public void run() {
+ try {
+ callback.execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+
+ public void testOneNodeGoingOnline() throws Exception {
+ final LinkedBlockingQueue<ClusterState> stateEvents1 = new LinkedBlockingQueue<ClusterState>();
+ zkcsm1.addListener(new ClusterListener() {
+ public void onClusterChange(ClusterState config) {
+ stateEvents1.add(config);
+ }
+ });
+ zkcsm1.start();
+
+ // Drain the events..
+ while( stateEvents1.poll(100, TimeUnit.MILLISECONDS)!=null ) {
+ }
+
+ // Let node1 join the cluster.
+ zkcsm1.addMember("kdbr://localhost:60001");
+
+ ClusterState state = stateEvents1.poll(1, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNull(state.getMaster());
+ assertTrue(state.getSlaves().size()==1);
+
+ // Let the cluster know that node1 is online..
+ PBClusterNodeStatus status = new PBClusterNodeStatus();
+ status.setConnectUri("kdbr://localhost:60001");
+ status.setLastUpdate(new PBJournalLocation().setFileId(0).setOffset(0));
+ status.setState(State.SLAVE_UNCONNECTED);
+ zkcsm1.setMemberStatus(status);
+
+ state = stateEvents1.poll(10, TimeUnit.SECONDS);
+ assertNotNull(state);
+ assertNotNull(state.getMaster());
+ assertEquals("kdbr://localhost:60001", state.getMaster());
+ assertTrue(state.getSlaves().isEmpty());
+
+ zkcsm1.stop();
+ }
+}
Modified: activemq/sandbox/kahadb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/resources/log4j.properties?rev=718453&r1=718452&r2=718453&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/resources/log4j.properties (original)
+++ activemq/sandbox/kahadb/src/test/resources/log4j.properties Mon Nov 17 16:53:32 2008
@@ -21,6 +21,7 @@
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.zookeeper=WARN
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender