You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/15 18:39:33 UTC

svn commit: r1301114 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/ core/src/test/java/org/apache/hama/bsp/

Author: tjungblut
Date: Thu Mar 15 17:39:33 2012
New Revision: 1301114

URL: http://svn.apache.org/viewvc?rev=1301114&view=rev
Log:
[HAMA-499]: Refactor clearZKNodes() in BSPMaster contributed by Apurv Verma


Added:
    incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
      - copied unchanged from r1301099, incubator/hama/trunk/core/src/main/java/org/apache/hama/MiniZooKeeperCluster.java
Removed:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/MiniZooKeeperCluster.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Mar 15 17:39:33 2012
@@ -13,6 +13,7 @@ Release 0.5 - Unreleased
 
   IMPROVEMENTS
 
+    HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut)
     HAMA-485: Fill Counters with useful information (tjungblut)
     HAMA-497: Switch the trunk to Hadoop 1.0 based (edwardyoon) 
     HAMA-445: Make configurable checkpointing (Suraj Menon via edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Thu Mar 15 17:39:33 2012
@@ -65,7 +65,7 @@ import org.apache.zookeeper.data.Stat;
  * jobs.
  */
 public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
-    GroomServerManager, Watcher {
+GroomServerManager, Watcher {
 
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
   public static final String localModeMessage = "Local mode detected, no launch of the daemon needed.";
@@ -240,12 +240,12 @@ public class BSPMaster implements JobSub
    * Start the BSPMaster process, listen on the indicated hostname/port
    */
   public BSPMaster(HamaConfiguration conf) throws IOException,
-      InterruptedException {
+  InterruptedException {
     this(conf, generateNewIdentifier());
   }
 
   BSPMaster(HamaConfiguration conf, String identifier) throws IOException,
-      InterruptedException {
+  InterruptedException {
     this.conf = conf;
     this.masterIdentifier = identifier;
 
@@ -492,27 +492,43 @@ public class BSPMaster implements JobSub
     }
   }
 
-  public void clearZKNodes() {
+  /**
+   * Clears all sub-children of node bspRoot 
+   */
+  public void clearZKNodes(){
     try {
-      for (String node : zk.getChildren(bspRoot, this)) {
-        for (String subnode : zk.getChildren(bspRoot + "/" + node, this)) {
-          for (String subnode2 : zk.getChildren(bspRoot + "/" + node, this)) {
-            for (String subnode3 : zk.getChildren(bspRoot + "/" + node + "/"
-                + subnode2, this)) {
-              zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2
-                  + "/" + subnode3, 0);
-            }
-            zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2, 0);
-          }
-          zk.delete(bspRoot + "/" + node + "/" + subnode, 0);
-        }
-        zk.delete(bspRoot + "/" + node, 0);
+      Stat s = zk.exists(bspRoot, false);
+      if(s != null){
+        clearZKNodes(bspRoot);
+      }      
+
+    } catch (Exception e) {
+      LOG.warn("Could not clear zookeeper nodes.", e);
+
+    }      
+  }
+
+  /**
+   * Clears all sub-children of node rooted at path.
+   * @param path
+   * @throws InterruptedException 
+   * @throws KeeperException 
+   */
+  private void clearZKNodes(String path) throws KeeperException, InterruptedException{
+    ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
+
+    if(list.size() == 0){
+      return;
+
+    }else{
+      for(String node:list){
+        clearZKNodes(path+"/"+node);
+        zk.delete(path+"/"+node, -1); //delete any version of this node.
       }
-    } catch (KeeperException e) {
-    } catch (InterruptedException e) {
     }
   }
 
+
   public void createJobRoot(String string) {
     try {
       zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Thu Mar 15 17:39:33 2012
@@ -100,6 +100,7 @@ public abstract class HamaClusterTestCas
         shutdownDfs(dfsCluster);
       }
       bspCluster.shutdown();
+      zooKeeperCluster.shutdown();
     } catch (Exception e) {
       LOG.error(e);
     }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Mar 15 17:39:33 2012
@@ -19,6 +19,9 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.IOException;
+import java.util.ArrayList;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +35,14 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.examples.ClassSerializePrinting;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
 
 public class TestBSPMasterGroomServer extends HamaCluster {
 
@@ -58,6 +69,14 @@ public class TestBSPMasterGroomServer ex
     super.setUp();
   }
 
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  /*
+   * BEGIN: Job submission tests.
+   */
+
   public void testSubmitJob() throws Exception {
     BSPJob bsp = new BSPJob(configuration,
         org.apache.hama.examples.ClassSerializePrinting.class);
@@ -113,7 +132,71 @@ public class TestBSPMasterGroomServer ex
     fileSys.delete(new Path(TMP_OUTPUT), true);
   }
 
-  public void tearDown() throws Exception {
-    super.tearDown();
+  /*
+   * END: Job submission tests.
+   */
+
+  /*
+   * BEGIN: ZooKeeper tests.
+   */
+
+  public void testClearZKNodes() throws IOException, KeeperException,
+      InterruptedException {
+
+    // Clear any existing znode with the same path as bspRoot.
+    bspCluster.getBSPMaster().clearZKNodes();
+
+    int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
+        6000);
+    String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
+    String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT,
+        Constants.DEFAULT_ZOOKEEPER_ROOT);
+
+    // Establishing a zk session.
+    ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // Do nothing.(Dummy Watcher)
+      }
+    });
+
+    // Creating dummy bspRoot if it doesn't already exist.
+    Stat s = zk.exists(bspRoot, false);
+    if (s == null) {
+      zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+    }
+
+    // Creating dummy child nodes at depth 1.
+    String node1 = bspRoot + "/task1";
+    String node2 = bspRoot + "/task2";
+    zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+    // Creating dummy child node at depth 2.
+    String node11 = node1 + "/superstep1";
+    zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+    ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot, false);
+    assertEquals(2, list.size());
+    System.out.println(list.size());
+
+    bspCluster.getBSPMaster().clearZKNodes();
+
+    list = (ArrayList<String>) zk.getChildren(bspRoot, false);
+    System.out.println(list.size());
+    assertEquals(0, list.size());
+
+    try {
+      zk.getData(node11, false, null);
+      fail();
+    } catch (KeeperException.NoNodeException e) {
+      System.out.println("Node has been removed correctly!");
+    }
   }
+
+  /*
+   * END: ZooKeeper tests.
+   */
+
 }