You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/03/25 00:20:24 UTC
svn commit: r1085190 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/zookeeper/
src/test/java/org/apache/hadoop/hbase/
Author: stack
Date: Thu Mar 24 23:20:24 2011
New Revision: 1085190
URL: http://svn.apache.org/viewvc?rev=1085190&view=rev
Log:
HBASE-3052 Add ability to have multiple ZK servers in a quorum in MiniZooKeeperCluster for test writing
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Mar 24 23:20:24 2011
@@ -100,6 +100,8 @@ Release 0.91.0 - Unreleased
(Karthick Sankarachary via Stack)
HBASE-3474 HFileOutputFormat to use column family's compression algorithm
HBASE-3541 REST Multi Gets (Elliott Clark via Stack)
+ HBASE-3052 Add ability to have multiple ZK servers in a quorum in
+ MiniZooKeeperCluster for test writing (Liyin Tang via Stack)
TASK
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java Thu Mar 24 23:20:24 2011
@@ -121,7 +121,7 @@ public class HMasterCommandLine extends
if (zkClientPort == 0) {
throw new IOException("No config value for hbase.zookeeper.property.clientPort");
}
- zooKeeperCluster.setClientPort(zkClientPort);
+ zooKeeperCluster.setDefaultClientPort(zkClientPort);
int clientPort = zooKeeperCluster.startup(zkDataPath);
if (clientPort != zkClientPort) {
String errorMsg = "Couldnt start ZK at requested address of " +
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java Thu Mar 24 23:20:24 2011
@@ -28,6 +28,8 @@ import java.io.Reader;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,27 +50,45 @@ public class MiniZooKeeperCluster {
private static final int CONNECTION_TIMEOUT = 30000;
private boolean started;
- private int clientPort = 21818; // use non-standard port
- private NIOServerCnxn.Factory standaloneServerFactory;
+ private int defaultClientPort = 21818; // use non-standard port
+ private int clientPort = defaultClientPort;
+
+ private List<NIOServerCnxn.Factory> standaloneServerFactoryList;
+ private List<ZooKeeperServer> zooKeeperServers;
+ private List<Integer> clientPortList;
+
+ private int activeZKServerIndex;
private int tickTime = 0;
/** Create mini ZooKeeper cluster. */
public MiniZooKeeperCluster() {
this.started = false;
+ activeZKServerIndex = -1;
+ zooKeeperServers = new ArrayList<ZooKeeperServer>();
+ clientPortList = new ArrayList<Integer>();
+ standaloneServerFactoryList = new ArrayList<NIOServerCnxn.Factory>();
}
- public void setClientPort(int clientPort) {
- this.clientPort = clientPort;
+ public void setDefaultClientPort(int clientPort) {
+ this.defaultClientPort = clientPort;
}
- public int getClientPort() {
- return clientPort;
+ public int getDefaultClientPort() {
+ return defaultClientPort;
}
public void setTickTime(int tickTime) {
this.tickTime = tickTime;
}
+
+ public int getBackupZooKeeperServerNum() {
+ return zooKeeperServers.size()-1;
+ }
+
+ public int getZooKeeperServerNum() {
+ return zooKeeperServers.size();
+ }
// / XXX: From o.a.zk.t.ClientBase
private static void setupTestEnv() {
@@ -79,50 +99,70 @@ public class MiniZooKeeperCluster {
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100);
}
+
+ public int startup(File baseDir) throws IOException,
+ InterruptedException {
+ return startup(baseDir,1);
+ }
/**
* @param baseDir
+ * @param numZooKeeperServers
* @return ClientPort server bound to.
* @throws IOException
* @throws InterruptedException
*/
- public int startup(File baseDir) throws IOException,
+ public int startup(File baseDir, int numZooKeeperServers) throws IOException,
InterruptedException {
+ if (numZooKeeperServers <= 0)
+ return -1;
setupTestEnv();
-
shutdown();
-
- File dir = new File(baseDir, "zookeeper").getAbsoluteFile();
- recreateDir(dir);
-
- int tickTimeToUse;
- if (this.tickTime > 0) {
- tickTimeToUse = this.tickTime;
- } else {
- tickTimeToUse = TICK_TIME;
- }
- ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
- while (true) {
- try {
- standaloneServerFactory =
- new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
- } catch (BindException e) {
- LOG.info("Failed binding ZK Server to client port: " + clientPort);
- //this port is already in use. try to use another
- clientPort++;
- continue;
+
+ // running all the ZK servers
+ for (int i = 0; i < numZooKeeperServers; i++) {
+ File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
+ recreateDir(dir);
+ clientPort = defaultClientPort;
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
}
- break;
- }
- standaloneServerFactory.startup(server);
-
- if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
- throw new IOException("Waiting for startup of standalone server");
+ ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ NIOServerCnxn.Factory standaloneServerFactory;
+ while (true) {
+ try {
+ standaloneServerFactory =
+ new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
+ } catch (BindException e) {
+ LOG.info("Failed binding ZK Server to client port: " + clientPort);
+ //this port is already in use. try to use another
+ clientPort++;
+ continue;
+ }
+ break;
+ }
+
+ // Start up this ZK server
+ standaloneServerFactory.startup(server);
+ if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ clientPortList.add(clientPort);
+ standaloneServerFactoryList.add(standaloneServerFactory);
+ zooKeeperServers.add(server);
}
-
+
+ // set the first one to be active ZK; Others are backups
+ activeZKServerIndex = 0;
started = true;
- LOG.info("Started MiniZK Server on client port: " + clientPort);
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+ "on client port: " + clientPort);
return clientPort;
}
@@ -144,13 +184,96 @@ public class MiniZooKeeperCluster {
if (!started) {
return;
}
+ // shut down all the zk servers
+ for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+ NIOServerCnxn.Factory standaloneServerFactory =
+ standaloneServerFactoryList.get(i);
+ int clientPort = clientPortList.get(i);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+ }
+ // clear everything
+ started = false;
+ activeZKServerIndex = 0;
+ standaloneServerFactoryList.clear();
+ clientPortList.clear();
+ zooKeeperServers.clear();
+
+ LOG.info("Shutdown MiniZK cluster with all ZK servers");
+ }
+
+ /**@return clientPort return clientPort if there is another ZK backup can run
+ * when killing the current active; return -1, if there is no backups.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int killCurrentActiveZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0 ) {
+ return -1;
+ }
+
+ // Shutdown the current active one
+ NIOServerCnxn.Factory standaloneServerFactory =
+ standaloneServerFactoryList.get(activeZKServerIndex);
+ int clientPort = clientPortList.get(activeZKServerIndex);
+
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
-
- started = false;
+
+ // remove the current active zk server
+ standaloneServerFactoryList.remove(activeZKServerIndex);
+ clientPortList.remove(activeZKServerIndex);
+ zooKeeperServers.remove(activeZKServerIndex);
+ LOG.info("Kill the current active ZK servers in the cluster " +
+ "on client port: " + clientPort);
+
+ if (standaloneServerFactoryList.size() == 0) {
+ // there is no backup servers;
+ return -1;
+ }
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Activate a backup zk server in the cluster " +
+ "on client port: " + clientPort);
+ // return the next back zk server's port
+ return clientPort;
+ }
+
+ /**
+ * Kill one back up ZK servers
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void killOneBackupZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0 ||
+ standaloneServerFactoryList.size() <= 1) {
+ return ;
+ }
+
+ int backupZKServerIndex = activeZKServerIndex+1;
+ // Shutdown the current active one
+ NIOServerCnxn.Factory standaloneServerFactory =
+ standaloneServerFactoryList.get(backupZKServerIndex);
+ int clientPort = clientPortList.get(backupZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ // remove this backup zk server
+ standaloneServerFactoryList.remove(backupZKServerIndex);
+ clientPortList.remove(backupZKServerIndex);
+ zooKeeperServers.remove(backupZKServerIndex);
+ LOG.info("Kill one backup ZK servers in the cluster " +
+ "on client port: " + clientPort);
}
// XXX: From o.a.zk.t.ClientBase
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Mar 24 23:20:24 2011
@@ -245,18 +245,38 @@ public class HBaseTestingUtility {
* @return zk cluster started.
*/
public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
- return startMiniZKCluster(setupClusterTestBuildDir());
+ return startMiniZKCluster(setupClusterTestBuildDir(),1);
}
+
+ /**
+ * Call this if you only want a zk cluster.
+ * @param zooKeeperServerNum
+ * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
+ * @throws Exception
+ * @see #shutdownMiniZKCluster()
+ * @return zk cluster started.
+ */
+ public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum)
+ throws Exception {
+ return startMiniZKCluster(setupClusterTestBuildDir(), zooKeeperServerNum);
+ }
+
private MiniZooKeeperCluster startMiniZKCluster(final File dir)
+ throws Exception {
+ return startMiniZKCluster(dir,1);
+ }
+
+ private MiniZooKeeperCluster startMiniZKCluster(final File dir,
+ int zooKeeperServerNum)
throws Exception {
this.passedZkCluster = false;
if (this.zkCluster != null) {
throw new IOException("Cluster already running at " + dir);
}
this.zkCluster = new MiniZooKeeperCluster();
- int clientPort = this.zkCluster.startup(dir);
+ int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum);
this.conf.set("hbase.zookeeper.property.clientPort",
Integer.toString(clientPort));
return this.zkCluster;
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java Thu Mar 24 23:20:24 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -136,6 +137,47 @@ public class TestHBaseTestingUtility {
cluster.shutdown();
}
}
+
+ @Test public void testMiniZooKeeper() throws Exception {
+ MiniZooKeeperCluster cluster1 = this.hbt.startMiniZKCluster();
+ try {
+ assertEquals(0, cluster1.getBackupZooKeeperServerNum());
+ assertTrue((cluster1.killCurrentActiveZooKeeperServer() == -1));
+ } finally {
+ cluster1.shutdown();
+ }
+
+ this.hbt.shutdownMiniZKCluster();
+
+ // set up zookeeper cluster with 5 zk servers
+ MiniZooKeeperCluster cluster2 = this.hbt.startMiniZKCluster(5);
+ int defaultClientPort = 21818;
+ cluster2.setDefaultClientPort(defaultClientPort);
+ try {
+ assertEquals(4, cluster2.getBackupZooKeeperServerNum());
+
+ // killing the current active zk server
+ assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort));
+ assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort));
+ assertEquals(2, cluster2.getBackupZooKeeperServerNum());
+ assertEquals(3, cluster2.getZooKeeperServerNum());
+
+ // killing the backup zk servers
+ cluster2.killOneBackupZooKeeperServer();
+ cluster2.killOneBackupZooKeeperServer();
+ assertEquals(0, cluster2.getBackupZooKeeperServerNum());
+ assertEquals(1, cluster2.getZooKeeperServerNum());
+
+ // killing the last zk server
+ assertTrue((cluster2.killCurrentActiveZooKeeperServer() == -1));
+ // this should do nothing.
+ cluster2.killOneBackupZooKeeperServer();
+ assertEquals(-1, cluster2.getBackupZooKeeperServerNum());
+ assertEquals(0, cluster2.getZooKeeperServerNum());
+ } finally {
+ cluster2.shutdown();
+ }
+ }
@Test public void testMiniDFSCluster() throws Exception {
MiniDFSCluster cluster = this.hbt.startMiniDFSCluster(1);