You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/03/15 18:39:33 UTC
svn commit: r1301114 - in /incubator/hama/trunk: ./
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/test/java/org/apache/hama/ core/src/test/java/org/apache/hama/bsp/
Author: tjungblut
Date: Thu Mar 15 17:39:33 2012
New Revision: 1301114
URL: http://svn.apache.org/viewvc?rev=1301114&view=rev
Log:
[HAMA-499]: Refactor clearZKNodes() in BSPMaster contributed by Apurv Verma
Added:
incubator/hama/trunk/core/src/test/java/org/apache/hama/MiniZooKeeperCluster.java
- copied unchanged from r1301099, incubator/hama/trunk/core/src/main/java/org/apache/hama/MiniZooKeeperCluster.java
Removed:
incubator/hama/trunk/core/src/main/java/org/apache/hama/MiniZooKeeperCluster.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Mar 15 17:39:33 2012
@@ -13,6 +13,7 @@ Release 0.5 - Unreleased
IMPROVEMENTS
+ HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut)
HAMA-485: Fill Counters with useful information (tjungblut)
HAMA-497: Switch the trunk to Hadoop 1.0 based (edwardyoon)
HAMA-445: Make configurable checkpointing (Suraj Menon via edwardyoon)
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Thu Mar 15 17:39:33 2012
@@ -65,7 +65,7 @@ import org.apache.zookeeper.data.Stat;
* jobs.
*/
public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
- GroomServerManager, Watcher {
+GroomServerManager, Watcher {
public static final Log LOG = LogFactory.getLog(BSPMaster.class);
public static final String localModeMessage = "Local mode detected, no launch of the daemon needed.";
@@ -240,12 +240,12 @@ public class BSPMaster implements JobSub
* Start the BSPMaster process, listen on the indicated hostname/port
*/
public BSPMaster(HamaConfiguration conf) throws IOException,
- InterruptedException {
+ InterruptedException {
this(conf, generateNewIdentifier());
}
BSPMaster(HamaConfiguration conf, String identifier) throws IOException,
- InterruptedException {
+ InterruptedException {
this.conf = conf;
this.masterIdentifier = identifier;
@@ -492,27 +492,43 @@ public class BSPMaster implements JobSub
}
}
- public void clearZKNodes() {
+ /**
+ * Clears all sub-children of node bspRoot
+ */
+ public void clearZKNodes(){
try {
- for (String node : zk.getChildren(bspRoot, this)) {
- for (String subnode : zk.getChildren(bspRoot + "/" + node, this)) {
- for (String subnode2 : zk.getChildren(bspRoot + "/" + node, this)) {
- for (String subnode3 : zk.getChildren(bspRoot + "/" + node + "/"
- + subnode2, this)) {
- zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2
- + "/" + subnode3, 0);
- }
- zk.delete(bspRoot + "/" + node + "/" + subnode + "/" + subnode2, 0);
- }
- zk.delete(bspRoot + "/" + node + "/" + subnode, 0);
- }
- zk.delete(bspRoot + "/" + node, 0);
+ Stat s = zk.exists(bspRoot, false);
+ if(s != null){
+ clearZKNodes(bspRoot);
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Could not clear zookeeper nodes.", e);
+
+ }
+ }
+
+ /**
+ * Clears all sub-children of node rooted at path.
+ * @param path
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ private void clearZKNodes(String path) throws KeeperException, InterruptedException{
+ ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
+
+ if(list.size() == 0){
+ return;
+
+ }else{
+ for(String node:list){
+ clearZKNodes(path+"/"+node);
+ zk.delete(path+"/"+node, -1); //delete any version of this node.
}
- } catch (KeeperException e) {
- } catch (InterruptedException e) {
}
}
+
public void createJobRoot(String string) {
try {
zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Thu Mar 15 17:39:33 2012
@@ -100,6 +100,7 @@ public abstract class HamaClusterTestCas
shutdownDfs(dfsCluster);
}
bspCluster.shutdown();
+ zooKeeperCluster.shutdown();
} catch (Exception e) {
LOG.error(e);
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1301114&r1=1301113&r2=1301114&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Mar 15 17:39:33 2012
@@ -19,6 +19,9 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
+import java.util.ArrayList;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +35,14 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.examples.ClassSerializePrinting;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
public class TestBSPMasterGroomServer extends HamaCluster {
@@ -58,6 +69,14 @@ public class TestBSPMasterGroomServer ex
super.setUp();
}
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ /*
+ * BEGIN: Job submission tests.
+ */
+
public void testSubmitJob() throws Exception {
BSPJob bsp = new BSPJob(configuration,
org.apache.hama.examples.ClassSerializePrinting.class);
@@ -113,7 +132,71 @@ public class TestBSPMasterGroomServer ex
fileSys.delete(new Path(TMP_OUTPUT), true);
}
- public void tearDown() throws Exception {
- super.tearDown();
+ /*
+ * END: Job submission tests.
+ */
+
+ /*
+ * BEGIN: ZooKeeper tests.
+ */
+
+ public void testClearZKNodes() throws IOException, KeeperException,
+ InterruptedException {
+
+ // Clear any existing znode with the same path as bspRoot.
+ bspCluster.getBSPMaster().clearZKNodes();
+
+ int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
+ 6000);
+ String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
+ String bspRoot = configuration.get(Constants.ZOOKEEPER_ROOT,
+ Constants.DEFAULT_ZOOKEEPER_ROOT);
+
+ // Establishing a zk session.
+ ZooKeeper zk = new ZooKeeper(connectStr, timeout, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // Do nothing.(Dummy Watcher)
+ }
+ });
+
+ // Creating dummy bspRoot if it doesn't already exist.
+ Stat s = zk.exists(bspRoot, false);
+ if (s == null) {
+ zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+
+ // Creating dummy child nodes at depth 1.
+ String node1 = bspRoot + "/task1";
+ String node2 = bspRoot + "/task2";
+ zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // Creating dummy child node at depth 2.
+ String node11 = node1 + "/superstep1";
+ zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot, false);
+ assertEquals(2, list.size());
+ System.out.println(list.size());
+
+ bspCluster.getBSPMaster().clearZKNodes();
+
+ list = (ArrayList<String>) zk.getChildren(bspRoot, false);
+ System.out.println(list.size());
+ assertEquals(0, list.size());
+
+ try {
+ zk.getData(node11, false, null);
+ fail();
+ } catch (KeeperException.NoNodeException e) {
+ System.out.println("Node has been removed correctly!");
+ }
}
+
+ /*
+ * END: ZooKeeper tests.
+ */
+
}