You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/03/25 00:20:24 UTC

svn commit: r1085190 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/zookeeper/ src/test/java/org/apache/hadoop/hbase/

Author: stack
Date: Thu Mar 24 23:20:24 2011
New Revision: 1085190

URL: http://svn.apache.org/viewvc?rev=1085190&view=rev
Log:
HBASE-3052 Add ability to have multiple ZK servers in a quorum in MiniZooKeeperCluster for test writing

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Mar 24 23:20:24 2011
@@ -100,6 +100,8 @@ Release 0.91.0 - Unreleased
                (Karthick Sankarachary via Stack)
    HBASE-3474  HFileOutputFormat to use column family's compression algorithm
    HBASE-3541  REST Multi Gets (Elliott Clark via Stack)
+   HBASE-3052  Add ability to have multiple ZK servers in a quorum in
+               MiniZooKeeperCluster for test writing (Liyin Tang via Stack)
 
   TASK
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMasterCommandLine.java Thu Mar 24 23:20:24 2011
@@ -121,7 +121,7 @@ public class HMasterCommandLine extends 
         if (zkClientPort == 0) {
           throw new IOException("No config value for hbase.zookeeper.property.clientPort");
         }
-        zooKeeperCluster.setClientPort(zkClientPort);
+        zooKeeperCluster.setDefaultClientPort(zkClientPort);
         int clientPort = zooKeeperCluster.startup(zkDataPath);
         if (clientPort != zkClientPort) {
           String errorMsg = "Couldnt start ZK at requested address of " +

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java Thu Mar 24 23:20:24 2011
@@ -28,6 +28,8 @@ import java.io.Reader;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,27 +50,45 @@ public class MiniZooKeeperCluster {
   private static final int CONNECTION_TIMEOUT = 30000;
 
   private boolean started;
-  private int clientPort = 21818; // use non-standard port
 
-  private NIOServerCnxn.Factory standaloneServerFactory;
+  private int defaultClientPort = 21818; // use non-standard port
+  private int clientPort = defaultClientPort; 
+  
+  private List<NIOServerCnxn.Factory> standaloneServerFactoryList;
+  private List<ZooKeeperServer> zooKeeperServers;
+  private List<Integer> clientPortList;
+  
+  private int activeZKServerIndex;
   private int tickTime = 0;
 
   /** Create mini ZooKeeper cluster. */
   public MiniZooKeeperCluster() {
     this.started = false;
+    activeZKServerIndex = -1;
+    zooKeeperServers = new ArrayList<ZooKeeperServer>();
+    clientPortList = new ArrayList<Integer>();
+    standaloneServerFactoryList = new ArrayList<NIOServerCnxn.Factory>();
   }
 
-  public void setClientPort(int clientPort) {
-    this.clientPort = clientPort;
+  public void setDefaultClientPort(int clientPort) {
+    this.defaultClientPort = clientPort;
   }
 
-  public int getClientPort() {
-    return clientPort;
+  public int getDefaultClientPort() {
+    return defaultClientPort;
   }
 
   public void setTickTime(int tickTime) {
     this.tickTime = tickTime;
   }
+  
+  public int getBackupZooKeeperServerNum() {
+    return zooKeeperServers.size()-1;
+  }
+  
+  public int getZooKeeperServerNum() {
+    return zooKeeperServers.size();
+  }
 
   // / XXX: From o.a.zk.t.ClientBase
   private static void setupTestEnv() {
@@ -79,50 +99,70 @@ public class MiniZooKeeperCluster {
     System.setProperty("zookeeper.preAllocSize", "100");
     FileTxnLog.setPreallocSize(100);
   }
+  
+  public int startup(File baseDir) throws IOException,
+  InterruptedException {
+    return startup(baseDir,1);
+  }
 
   /**
    * @param baseDir
+   * @param numZooKeeperServers
    * @return ClientPort server bound to.
    * @throws IOException
    * @throws InterruptedException
    */
-  public int startup(File baseDir) throws IOException,
+  public int startup(File baseDir, int numZooKeeperServers) throws IOException,
       InterruptedException {
+    if (numZooKeeperServers <= 0)
+      return -1;
 
     setupTestEnv();
-
     shutdown();
-
-    File dir = new File(baseDir, "zookeeper").getAbsoluteFile();
-    recreateDir(dir);
-
-    int tickTimeToUse;
-    if (this.tickTime > 0) {
-      tickTimeToUse = this.tickTime;
-    } else {
-      tickTimeToUse = TICK_TIME;
-    }
-    ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
-    while (true) {
-      try {
-        standaloneServerFactory =
-          new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
-      } catch (BindException e) {
-        LOG.info("Failed binding ZK Server to client port: " + clientPort);
-        //this port is already in use. try to use another
-        clientPort++;
-        continue;
+    
+    // running all the ZK servers
+    for (int i = 0; i < numZooKeeperServers; i++) {
+      File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
+      recreateDir(dir);
+      clientPort = defaultClientPort;
+      int tickTimeToUse;
+      if (this.tickTime > 0) {
+        tickTimeToUse = this.tickTime;
+      } else {
+        tickTimeToUse = TICK_TIME;
       }
-      break;
-    }
-    standaloneServerFactory.startup(server);
-
-    if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
-      throw new IOException("Waiting for startup of standalone server");
+      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);    
+      NIOServerCnxn.Factory standaloneServerFactory;
+      while (true) {
+        try {
+          standaloneServerFactory =
+            new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
+        } catch (BindException e) {
+          LOG.info("Failed binding ZK Server to client port: " + clientPort);
+          //this port is already in use. try to use another
+          clientPort++;
+          continue;
+        }
+        break;
+      }
+      
+      // Start up this ZK server
+      standaloneServerFactory.startup(server);  
+      if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
+        throw new IOException("Waiting for startup of standalone server");
+      }
+      
+      clientPortList.add(clientPort);
+      standaloneServerFactoryList.add(standaloneServerFactory);
+      zooKeeperServers.add(server);
     }
-
+    
+    // set the first one to be active ZK; Others are backups
+    activeZKServerIndex = 0;
     started = true;
-    LOG.info("Started MiniZK Server on client port: " + clientPort);
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+    		"on client port: " + clientPort);
     return clientPort;
   }
 
@@ -144,13 +184,96 @@ public class MiniZooKeeperCluster {
     if (!started) {
       return;
     }
+    // shut down all the zk servers
+    for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+      NIOServerCnxn.Factory standaloneServerFactory = 
+        standaloneServerFactoryList.get(i);      
+      int clientPort = clientPortList.get(i);
+      
+      standaloneServerFactory.shutdown();
+      if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+        throw new IOException("Waiting for shutdown of standalone server");
+      }
+    }
 
+    // clear everything
+    started = false;
+    activeZKServerIndex = 0;
+    standaloneServerFactoryList.clear();
+    clientPortList.clear();
+    zooKeeperServers.clear();
+    
+    LOG.info("Shutdown MiniZK cluster with all ZK servers");
+  }
+  
+  /**@return clientPort return clientPort if there is another ZK backup can run
+   *         when killing the current active; return -1, if there is no backups.
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  public int killCurrentActiveZooKeeperServer() throws IOException, 
+                                        InterruptedException {
+    if (!started || activeZKServerIndex < 0 ) {
+      return -1;
+    }
+    
+    // Shutdown the current active one
+    NIOServerCnxn.Factory standaloneServerFactory = 
+      standaloneServerFactoryList.get(activeZKServerIndex);
+    int clientPort = clientPortList.get(activeZKServerIndex);
+    
     standaloneServerFactory.shutdown();
     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
       throw new IOException("Waiting for shutdown of standalone server");
     }
-
-    started = false;
+    
+    // remove the current active zk server
+    standaloneServerFactoryList.remove(activeZKServerIndex);
+    clientPortList.remove(activeZKServerIndex);
+    zooKeeperServers.remove(activeZKServerIndex);    
+    LOG.info("Kill the current active ZK servers in the cluster " +
+        "on client port: " + clientPort);
+    
+    if (standaloneServerFactoryList.size() == 0) {
+      // there is no backup servers;
+      return -1;
+    }
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Activate a backup zk server in the cluster " +
+        "on client port: " + clientPort);
+    // return the next back zk server's port
+    return clientPort;
+  }
+  
+  /**
+   * Kill one back up ZK servers
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  public void killOneBackupZooKeeperServer() throws IOException, 
+                                        InterruptedException {
+    if (!started || activeZKServerIndex < 0 || 
+        standaloneServerFactoryList.size() <= 1) {
+      return ;
+    }
+    
+    int backupZKServerIndex = activeZKServerIndex+1;
+    // Shutdown the current active one
+    NIOServerCnxn.Factory standaloneServerFactory = 
+      standaloneServerFactoryList.get(backupZKServerIndex);
+    int clientPort = clientPortList.get(backupZKServerIndex);
+    
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+    
+    // remove this backup zk server
+    standaloneServerFactoryList.remove(backupZKServerIndex);
+    clientPortList.remove(backupZKServerIndex);
+    zooKeeperServers.remove(backupZKServerIndex);    
+    LOG.info("Kill one backup ZK servers in the cluster " +
+        "on client port: " + clientPort);
   }
 
   // XXX: From o.a.zk.t.ClientBase

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Mar 24 23:20:24 2011
@@ -245,18 +245,38 @@ public class HBaseTestingUtility {
    * @return zk cluster started.
    */
   public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
-    return startMiniZKCluster(setupClusterTestBuildDir());
+    return startMiniZKCluster(setupClusterTestBuildDir(),1);
 
   }
+  
+  /**
+   * Call this if you only want a zk cluster.
+   * @param zooKeeperServerNum
+   * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
+   * @throws Exception
+   * @see #shutdownMiniZKCluster()
+   * @return zk cluster started.
+   */
+  public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum) 
+      throws Exception {
+    return startMiniZKCluster(setupClusterTestBuildDir(), zooKeeperServerNum);
 
+  }
+  
   private MiniZooKeeperCluster startMiniZKCluster(final File dir)
+    throws Exception {
+    return startMiniZKCluster(dir,1);
+  }
+  
+  private MiniZooKeeperCluster startMiniZKCluster(final File dir, 
+      int zooKeeperServerNum)
   throws Exception {
     this.passedZkCluster = false;
     if (this.zkCluster != null) {
       throw new IOException("Cluster already running at " + dir);
     }
     this.zkCluster = new MiniZooKeeperCluster();
-    int clientPort = this.zkCluster.startup(dir);
+    int clientPort = this.zkCluster.startup(dir,zooKeeperServerNum);
     this.conf.set("hbase.zookeeper.property.clientPort",
       Integer.toString(clientPort));
     return this.zkCluster;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java?rev=1085190&r1=1085189&r2=1085190&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/TestHBaseTestingUtility.java Thu Mar 24 23:20:24 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -136,6 +137,47 @@ public class TestHBaseTestingUtility {
       cluster.shutdown();
     }
   }
+  
+  @Test public void testMiniZooKeeper() throws Exception {
+    MiniZooKeeperCluster cluster1 = this.hbt.startMiniZKCluster();
+    try {
+      assertEquals(0, cluster1.getBackupZooKeeperServerNum());    
+      assertTrue((cluster1.killCurrentActiveZooKeeperServer() == -1));
+    } finally {
+      cluster1.shutdown();
+    }
+    
+    this.hbt.shutdownMiniZKCluster();
+    
+    // set up zookeeper cluster with 5 zk servers
+    MiniZooKeeperCluster cluster2 = this.hbt.startMiniZKCluster(5);
+    int defaultClientPort = 21818;
+    cluster2.setDefaultClientPort(defaultClientPort);
+    try {
+      assertEquals(4, cluster2.getBackupZooKeeperServerNum());
+      
+      // killing the current active zk server
+      assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort));
+      assertTrue((cluster2.killCurrentActiveZooKeeperServer() >= defaultClientPort));     
+      assertEquals(2, cluster2.getBackupZooKeeperServerNum());
+      assertEquals(3, cluster2.getZooKeeperServerNum());
+      
+      // killing the backup zk servers
+      cluster2.killOneBackupZooKeeperServer();
+      cluster2.killOneBackupZooKeeperServer();
+      assertEquals(0, cluster2.getBackupZooKeeperServerNum());
+      assertEquals(1, cluster2.getZooKeeperServerNum());
+      
+      // killing the last zk server
+      assertTrue((cluster2.killCurrentActiveZooKeeperServer() == -1));
+      // this should do nothing.
+      cluster2.killOneBackupZooKeeperServer();
+      assertEquals(-1, cluster2.getBackupZooKeeperServerNum());
+      assertEquals(0, cluster2.getZooKeeperServerNum());         
+    } finally {
+      cluster2.shutdown();
+    }
+  }
 
   @Test public void testMiniDFSCluster() throws Exception {
     MiniDFSCluster cluster = this.hbt.startMiniDFSCluster(1);