You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:57:38 UTC

svn commit: r1077263 - in /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop: mapred/ mapreduce/test/system/ test/system/ test/system/process/

Author: omalley
Date: Fri Mar  4 03:57:38 2011
New Revision: 1077263

URL: http://svn.apache.org/viewvc?rev=1077263&view=rev
Log:
commit 60fd4d56096ea4e5a37e0f3b48223a59ec450eae
Author: Sharad Agarwal <sh...@yahoo-inc.com>
Date:   Tue Mar 2 10:45:50 2010 +0530

     patch from

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java Fri Mar  4 03:57:38 2011
@@ -19,7 +19,6 @@ import org.apache.hadoop.mapreduce.test.
 import org.apache.hadoop.mapreduce.test.system.TTInfo;
 import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import org.apache.hadoop.net.NetUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -47,35 +46,33 @@ public class TestCluster {
 
   @Test
   public void testProcessInfo() throws Exception {
-    LOG.info("Process info of master is : "
-        + cluster.getMaster().getProcessInfo());
-    Assert.assertNotNull(cluster.getMaster().getProcessInfo());
-    Collection<TTClient> slaves = cluster.getSlaves().values();
-    for (TTClient slave : slaves) {
-      LOG.info("Process info of slave is : " + slave.getProcessInfo());
-      Assert.assertNotNull(slave.getProcessInfo());
+    LOG.info("Process info of JobTracker is : "
+        + cluster.getJTClient().getProcessInfo());
+    Assert.assertNotNull(cluster.getJTClient().getProcessInfo());
+    Collection<TTClient> tts = cluster.getTTClients();
+    for (TTClient tt : tts) {
+      LOG.info("Process info of TaskTracker is : " + tt.getProcessInfo());
+      Assert.assertNotNull(tt.getProcessInfo());
     }
   }
   
   @Test
   public void testJobSubmission() throws Exception {
     Configuration conf = new Configuration(cluster.getConf());
-    JTProtocol wovenClient = cluster.getMaster().getProxy();
-    JobInfo[] jobs = wovenClient.getAllJobInfo();
     SleepJob job = new SleepJob();
     job.setConf(conf);
     conf = job.setupJobConf(1, 1, 100, 100, 100, 100);
-    RunningJob rJob = cluster.getMaster().submitAndVerifyJob(conf);
-    cluster.getMaster().verifyJobHistory(rJob.getID());
+    RunningJob rJob = cluster.getJTClient().submitAndVerifyJob(conf);
+    cluster.getJTClient().verifyJobHistory(rJob.getID());
   }
 
   @Test
   public void testFileStatus() throws Exception {
-    JTClient jt = cluster.getMaster();
+    JTClient jt = cluster.getJTClient();
     String dir = ".";
     checkFileStatus(jt.getFileStatus(dir, true));
     checkFileStatus(jt.listStatus(dir, false, true), dir);
-    for (TTClient tt : cluster.getSlaves().values()) {
+    for (TTClient tt : cluster.getTTClients()) {
       String[] localDirs = tt.getMapredLocalDirs();
       for (String localDir : localDirs) {
         checkFileStatus(tt.listStatus(localDir, true, false), localDir);
@@ -118,13 +115,13 @@ public class TestCluster {
   @Test
   public void testTaskStatus() throws Exception {
     Configuration conf = new Configuration(cluster.getConf());
-    JTProtocol wovenClient = cluster.getMaster().getProxy();
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
     FinishTaskControlAction.configureControlActionForJob(conf);
     SleepJob job = new SleepJob();
     job.setConf(conf);
 
     conf = job.setupJobConf(1, 0, 100, 100, 100, 100);
-    JobClient client = cluster.getMaster().getClient();
+    JobClient client = cluster.getJTClient().getClient();
 
     RunningJob rJob = client.submitJob(new JobConf(conf));
     JobID id = rJob.getID();
@@ -144,8 +141,7 @@ public class TestCluster {
         String[] taskTrackers = info.getTaskTrackers();
         for(String taskTracker : taskTrackers) {
           TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
-          TTClient ttCli =  cluster.getSlaves().get(
-              ttInfo.getStatus().getHost());
+          TTClient ttCli =  cluster.getTTClient(ttInfo.getStatus().getHost());
           TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(info.getTaskID());
           Assert.assertNotNull(ttTaskInfo);
           FinishTaskControlAction action = new FinishTaskControlAction(

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java Fri Mar  4 03:57:38 2011
@@ -40,13 +40,13 @@ public class TestControlledJob {
   @Test
   public void testControlledJob() throws Exception {
     Configuration conf = new Configuration(cluster.getConf());
-    JTProtocol wovenClient = cluster.getMaster().getProxy();
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
     FinishTaskControlAction.configureControlActionForJob(conf);
     SleepJob job = new SleepJob();
     job.setConf(conf);
     
     conf = job.setupJobConf(1, 0, 100, 100, 100, 100);
-    JobClient client = cluster.getMaster().getClient();
+    JobClient client = cluster.getJTClient().getClient();
     
     RunningJob rJob = client.submitJob(new JobConf(conf));
     JobID id = rJob.getID();
@@ -74,7 +74,7 @@ public class TestControlledJob {
       LOG.info("constructing control action to signal task to finish");
       FinishTaskControlAction action = new FinishTaskControlAction(
           TaskID.downgrade(info.getTaskID()));
-      for(TTClient cli : cluster.getSlaves().values()) {
+      for(TTClient cli : cluster.getTTClients()) {
         cli.getProxy().sendAction(action);
       }
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java Fri Mar  4 03:57:38 2011
@@ -64,7 +64,7 @@ public class TestSortValidate {
   @Before
   public void setUp() throws java.lang.Exception {
     cluster.setUp();
-    client = cluster.getMaster().getClient();
+    client = cluster.getJTClient().getClient();
 
     dfs = client.getFs();
     dfs.delete(SORT_INPUT_PATH, true);
@@ -97,7 +97,7 @@ public class TestSortValidate {
     int prevJobsNum = 0;
 
     // JTProtocol wovenClient
-    JTProtocol wovenClient = cluster.getMaster().getProxy();
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
 
     // JobStatus
     JobStatus[] jobStatus = null;
@@ -141,7 +141,7 @@ public class TestSortValidate {
       jInfo = wovenClient.getJobInfo(id);
     }
 
-    cluster.getMaster().verifyCompletedJob(id);
+    cluster.getJTClient().verifyCompletedJob(id);
   }
   
   private void runSort(Configuration job, Path sortInput, Path sortOutput) 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java Fri Mar  4 03:57:38 2011
@@ -54,7 +54,7 @@ public class TestTaskOwner {
 
     cluster = MRCluster.createCluster(new Configuration());
     cluster.setUp();
-    FileSystem fs = inDir.getFileSystem(cluster.getMaster().getConf());
+    FileSystem fs = inDir.getFileSystem(cluster.getJTClient().getConf());
     fs.create(inDir);
   }
 
@@ -65,7 +65,7 @@ public class TestTaskOwner {
   // in the cluster and we will authenticate whether matches
   // with the job that is submitted by the same user.
 
-    Configuration conf = cluster.getMaster().getConf();
+    Configuration conf = cluster.getJTClient().getConf();
     Job job = new Job(conf, "user name check");
 
     job.setJarByClass(UserNamePermission.class);
@@ -119,7 +119,7 @@ public class TestTaskOwner {
 
   @AfterClass
   public static void tearDown() throws java.lang.Exception {
-    FileSystem fs = outDir.getFileSystem(cluster.getMaster().getConf());
+    FileSystem fs = outDir.getFileSystem(cluster.getJTClient().getConf());
     fs.delete(outDir, true);
     cluster.tearDown();
    }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar  4 03:57:38 2011
@@ -1,78 +1,118 @@
 package org.apache.hadoop.mapreduce.test.system;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.test.system.AbstractMasterSlaveCluster;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.AbstractDaemonCluster;
 import org.apache.hadoop.test.system.process.ClusterProcessManager;
-import org.apache.hadoop.test.system.process.ClusterProcessManagerFactory;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
 import org.apache.hadoop.test.system.process.RemoteProcess;
-import org.apache.hadoop.test.system.process.ClusterProcessManager.ClusterType;
 
 /**
- * Concrete MasterSlaveCluster representing a Map-Reduce cluster.
+ * Concrete AbstractDaemonCluster representing a Map-Reduce cluster.
  * 
  */
-public class MRCluster extends AbstractMasterSlaveCluster<JTClient, 
-      TTClient> {
+@SuppressWarnings("unchecked")
+public class MRCluster extends AbstractDaemonCluster {
 
   private static final Log LOG = LogFactory.getLog(MRCluster.class);
+  public static final String CLUSTER_PROCESS_MGR_IMPL = 
+    "test.system.mr.clusterprocess.impl.class";
 
+  protected enum Role {JT, TT};
+  
   private MRCluster(Configuration conf, ClusterProcessManager rCluster)
       throws IOException {
     super(conf, rCluster);
   }
 
   /**
-   * Creates an instance of the Map-Reduce cluster.<br/>
-   * Example usage: <br/>
-   * <code>
-   * Configuration conf = new Configuration();<br/>
-   * conf.set(ClusterProcessManager.IMPL_CLASS,
-   * org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.
-   * class.getName())<br/>
-   * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPHOME,
-   * "/path");<br/>
-   * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPCONFDIR,
-   * "/path");<br/>
-   * MRCluster cluster = MRCluster.createCluster(conf);
-   * </code>
+   * Factory method to create an instance of the Map-Reduce cluster.<br/>
    * 
    * @param conf
    *          contains all required parameter to create cluster.
    * @return a cluster instance to be managed.
-   * @throws IOException
    * @throws Exception
    */
   public static MRCluster createCluster(Configuration conf) 
-      throws IOException, Exception {
-    return new MRCluster(conf, ClusterProcessManagerFactory.createInstance(
-        ClusterType.MAPRED, conf));
+      throws Exception {
+    String implKlass = conf.get(CLUSTER_PROCESS_MGR_IMPL, System
+        .getProperty(CLUSTER_PROCESS_MGR_IMPL));
+    if (implKlass == null || implKlass.isEmpty()) {
+      implKlass = MRProcessManager.class.getName();
+    }
+    Class<ClusterProcessManager> klass = (Class<ClusterProcessManager>) Class
+      .forName(implKlass);
+    ClusterProcessManager clusterProcessMgr = klass.newInstance();
+    LOG.info("Created ClusterProcessManager as " + implKlass);
+    clusterProcessMgr.init(conf);
+    return new MRCluster(conf, clusterProcessMgr);
   }
 
-  @Override
-  protected JTClient createMaster(RemoteProcess masterDaemon)
+  protected JTClient createJTClient(RemoteProcess jtDaemon)
       throws IOException {
-    return new JTClient(getConf(), masterDaemon);
+    return new JTClient(getConf(), jtDaemon);
   }
 
-  @Override
-  protected TTClient createSlave(RemoteProcess slaveDaemon) 
+  protected TTClient createTTClient(RemoteProcess ttDaemon) 
       throws IOException {
-    return new TTClient(getConf(), slaveDaemon);
+    return new TTClient(getConf(), ttDaemon);
+  }
+
+  public JTClient getJTClient() {
+    Iterator<AbstractDaemonClient> it = getDaemons().get(Role.JT).iterator();
+    return (JTClient) it.next();
+  }
+
+  public List<TTClient> getTTClients() {
+    return (List) getDaemons().get(Role.TT);
+  }
+
+  public TTClient getTTClient(String hostname) {
+    for (TTClient c : getTTClients()) {
+      if (c.getHostName().equals(hostname)) {
+        return c;
+      }
+    }
+    return null;
   }
 
   @Override
   public void ensureClean() throws IOException {
     //TODO: ensure that no jobs/tasks are running
     //restart the cluster if cleanup fails
-    JTClient jtClient = getMaster();
+    JTClient jtClient = getJTClient();
     JobInfo[] jobs = jtClient.getProxy().getAllJobInfo();
     for(JobInfo job : jobs) {
       jtClient.getClient().killJob(
           org.apache.hadoop.mapred.JobID.downgrade(job.getID()));
     }
   }
+
+  @Override
+  protected AbstractDaemonClient createClient(
+      RemoteProcess process) throws IOException {
+    if (Role.JT.equals(process.getRole())) {
+      return createJTClient(process);
+    } else if (Role.TT.equals(process.getRole())) {
+      return createTTClient(process);
+    } else throw new IOException("Role: "+ process.getRole() + "  is not " +
+      "applicable to MRCluster");
+  }
+
+  public static class MRProcessManager extends HadoopDaemonRemoteCluster{
+    private static final List<HadoopDaemonInfo> mrDaemonInfos = 
+      Arrays.asList(new HadoopDaemonInfo[]{
+          new HadoopDaemonInfo("jobtracker", Role.JT, "masters"),
+          new HadoopDaemonInfo("tasktracker", Role.TT, "slaves")});
+    public MRProcessManager() {
+      super(mrDaemonInfos);
+    }
+  }
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java?rev=1077263&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java Fri Mar  4 03:57:38 2011
@@ -0,0 +1,223 @@
+package org.apache.hadoop.test.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Abstract class which represent the cluster having multiple daemons.
+ */
+@SuppressWarnings("unchecked")
+public abstract class AbstractDaemonCluster {
+
+  private static final Log LOG = LogFactory.getLog(AbstractDaemonCluster.class);
+
+  private Configuration conf;
+  protected ClusterProcessManager clusterManager;
+  private Map<Enum<?>, List<AbstractDaemonClient>> daemons = 
+    new LinkedHashMap<Enum<?>, List<AbstractDaemonClient>>();
+  
+  /**
+   * Constructor to create a cluster client.<br/>
+   * 
+   * @param conf
+   *          Configuration to be used while constructing the cluster.
+   * @param rcluster
+   *          process manger instance to be used for managing the daemons.
+   * 
+   * @throws IOException
+   */
+  public AbstractDaemonCluster(Configuration conf,
+      ClusterProcessManager rcluster) throws IOException {
+    this.conf = conf;
+    this.clusterManager = rcluster;
+    createAllClients();
+  }
+
+  protected void createAllClients() throws IOException {
+    for (RemoteProcess p : clusterManager.getAllProcesses()) {
+      List<AbstractDaemonClient> dms = daemons.get(p.getRole());
+      if (dms == null) {
+        dms = new ArrayList<AbstractDaemonClient>();
+        daemons.put(p.getRole(), dms);
+      }
+      dms.add(createClient(p));
+    }
+  }
+  
+  /**
+   * Method to create the daemon client.<br/>
+   * 
+   * @param remoteprocess
+   *          to manage the daemon.
+   * @return instance of the daemon client
+   * 
+   * @throws IOException
+   */
+  protected abstract AbstractDaemonClient<DaemonProtocol> 
+    createClient(RemoteProcess process) throws IOException;
+
+  /**
+   * Get the global cluster configuration which was used to create the 
+   * cluster. <br/>
+   * 
+   * @return global configuration of the cluster.
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   *
+
+  /**
+   * Return the client handle of all the Daemons.<br/>
+   * 
+   * @return map of role to daemon clients' list.
+   */
+  public Map<Enum<?>, List<AbstractDaemonClient>> getDaemons() {
+    return daemons;
+  }
+
+  /**
+   * Checks if the cluster is ready for testing. <br/>
+   * Algorithm for checking is as follows : <br/>
+   * <ul>
+   * <li> Wait for Daemon to come up </li>
+   * <li> Check if daemon is ready </li>
+   * <li> If one of the daemon is not ready, return false </li>
+   * </ul> 
+   * 
+   * @return true if whole cluster is ready.
+   * 
+   * @throws IOException
+   */
+  public boolean isReady() throws IOException {
+    for (List<AbstractDaemonClient> set : daemons.values()) {
+      for (AbstractDaemonClient daemon : set) {
+        waitForDaemon(daemon);
+        if (!daemon.isReady()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  protected void waitForDaemon(AbstractDaemonClient d) {
+    while(true) {
+      try {
+        LOG.info("Waiting for daemon in host to come up : " + d.getHostName());
+        d.connect();
+        break;
+      } catch (IOException e) {
+        try {
+          Thread.sleep(10000);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+  }
+
+  /**
+   * Starts the cluster daemons.
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    clusterManager.start();
+  }
+
+  /**
+   * Stops the cluster daemons.
+   * @throws IOException
+   */
+  public void stop() throws IOException {
+    clusterManager.stop();
+  }
+
+  /**
+   * Connect to daemon RPC ports.
+   * @throws IOException
+   */
+  public void connect() throws IOException {
+    for (List<AbstractDaemonClient> set : daemons.values()) {
+      for (AbstractDaemonClient daemon : set) {
+        daemon.connect();
+      }
+    }
+  }
+
+  /**
+   * Disconnect to daemon RPC ports.
+   * @throws IOException
+   */
+  public void disconnect() throws IOException {
+    for (List<AbstractDaemonClient> set : daemons.values()) {
+      for (AbstractDaemonClient daemon : set) {
+        daemon.disconnect();
+      }
+    }
+  }
+
+  /**
+   * Ping all the daemons of the cluster.
+   * @throws IOException
+   */
+  public void ping() throws IOException {
+    for (List<AbstractDaemonClient> set : daemons.values()) {
+      for (AbstractDaemonClient daemon : set) {
+        LOG.info("Daemon is : " + daemon.getHostName() + " pinging....");
+        daemon.ping();
+      }
+    }
+  }
+
+  /**
+   * Connect to the cluster and ensure that it is clean to run tests.
+   * @throws Exception
+   */
+  public void setUp() throws Exception {
+    while (!isReady()) {
+      Thread.sleep(1000);
+    }
+    connect();
+    ping();
+    clearAllControlActions();
+    ensureClean();
+  }
+
+  public void clearAllControlActions() throws IOException {
+    for (List<AbstractDaemonClient> set : daemons.values()) {
+      for (AbstractDaemonClient daemon : set) {
+        LOG.info("Daemon is : " + daemon.getHostName() + " pinging....");
+        daemon.getProxy().clearActions();
+      }
+    }
+  }
+
+  /**
+   * Ensure that the cluster is clean to run tests.
+   * @throws IOException
+   */
+  public void ensureClean() throws IOException {
+  }
+
+  /**
+   * Ensure that cluster is clean. Disconnect from the RPC ports of the daemons.
+   * @throws IOException
+   */
+  public void tearDown() throws IOException {
+    ensureClean();
+    clearAllControlActions();
+    disconnect();
+  }
+}
+

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java Fri Mar  4 03:57:38 2011
@@ -1,68 +1,48 @@
 package org.apache.hadoop.test.system.process;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 
 /**
- * Interface to manage the remote processes in the master-slave cluster.
+ * Interface to manage the remote processes in the cluster.
  */
 public interface ClusterProcessManager {
 
   /**
-   * The configuration key to specify the concrete implementation of the
-   * {@link ClusterProcessManager} to be used by
-   * {@link ClusterProcessManagerFactory}.
-   */
-  String IMPL_CLASS = "test.system.clusterprocessmanager.impl.class";
-
-  /**
-   * Enumeration used to specify the types of the clusters which are supported
-   * by the concrete implementations of {@link ClusterProcessManager}.
-   */
-  public enum ClusterType {
-    MAPRED, HDFS
-  }
-  
-  /**
-   * Initialization method to set cluster type and also pass the configuration
-   * object which is required by the ClusterProcessManager to manage the 
-   * cluster.<br/>
+   * Initialization method to pass the configuration object which is required 
+   * by the ClusterProcessManager to manage the cluster.<br/>
    * Configuration object should typically contain all the parameters which are 
    * required by the implementations.<br/>
    *  
-   * @param t type of the cluster to be managed.
    * @param conf configuration containing values of the specific keys which 
    * are required by the implementation of the cluster process manger.
    * 
-   * @throws Exception when initialization fails.
+   * @throws IOException when initialization fails.
    */
-  void init(ClusterType t, Configuration conf) throws Exception;
+  void init(Configuration conf) throws IOException;
 
   /**
-   * Getter for master daemon process for managing the master daemon.<br/>
-   * 
-   * @return master daemon process.
+   * Get the list of RemoteProcess handles of all the remote processes.
    */
-  RemoteProcess getMaster();
+  List<RemoteProcess> getAllProcesses();
 
   /**
-   * Getter for slave daemon process for managing the slaves.<br/>
-   * 
-   * @return map of slave hosts to slave daemon process.
+   * Get all the roles this cluster's daemon processes have.
    */
-  Map<String, RemoteProcess> getSlaves();
+  Set<Enum<?>> getRoles();
 
   /**
-   * Method to start the cluster including all master and slaves.<br/>
+   * Method to start all the remote daemons.<br/>
    * 
    * @throws IOException if startup procedure fails.
    */
   void start() throws IOException;
 
   /**
-   * Method to shutdown all the master and slaves.<br/>
+   * Method to shutdown all the remote daemons.<br/>
    * 
    * @throws IOException if shutdown procedure fails.
    */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java Fri Mar  4 03:57:38 2011
@@ -7,7 +7,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -26,11 +28,12 @@ import org.apache.hadoop.util.Shell.Shel
  * Following will be the format which the final command execution would look : 
  * <br/>
  * <code>
- *  ssh master-host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName 
- *  --config HADOOP_CONF_DIR (start|stop) masterCommand'
+ *  ssh host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName 
+ *  --config HADOOP_CONF_DIR (start|stop) command'
  * </code>
  */
-public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
+public abstract class HadoopDaemonRemoteCluster 
+    implements ClusterProcessManager {
 
   private static final Log LOG = LogFactory
       .getLog(HadoopDaemonRemoteCluster.class.getName());
@@ -53,62 +56,46 @@ public class HadoopDaemonRemoteCluster i
   private String hadoopHome;
   private String hadoopConfDir;
   private String deployed_hadoopConfDir;
-  private String masterCommand;
-  private String slaveCommand;
+  private final Set<Enum<?>> roles;
 
-  private RemoteProcess master;
-  private Map<String, RemoteProcess> slaves;
+  private final List<HadoopDaemonInfo> daemonInfos;
+  private List<RemoteProcess> processes;
+
+  public static class HadoopDaemonInfo {
+    public final String cmd;
+    public final Enum<?> role;
+    public final String hostFile;
+    public HadoopDaemonInfo(String cmd, Enum<?> role, String hostFile) {
+      super();
+      this.cmd = cmd;
+      this.role = role;
+      this.hostFile = hostFile;
+    }
+  }
+
+  public HadoopDaemonRemoteCluster(List<HadoopDaemonInfo> daemonInfos) {
+    this.daemonInfos = daemonInfos;
+    this.roles = new HashSet<Enum<?>>();
+    for (HadoopDaemonInfo info : daemonInfos) {
+      this.roles.add(info.role);
+    }
+  }
 
   @Override
-  public void init(ClusterType t, Configuration conf) throws Exception {
-    /*
-     * Initialization strategy of the HadoopDaemonRemoteCluster is three staged
-     * process: 1. Populate script names based on the type of passed cluster. 2.
-     * Populate the required directories. 3. Populate the master and slaves.
-     */
-    populateScriptNames(t);
+  public void init(Configuration conf) throws IOException {
     populateDirectories(conf);
-    this.slaves = new HashMap<String, RemoteProcess>();
+    this.processes = new ArrayList<RemoteProcess>();
     populateDaemons(deployed_hadoopConfDir);
   }
 
-  /**
-   * Method to populate the required master and slave commands which are used to
-   * manage the cluster.<br/>
-   * 
-   * @param t
-   *          type of cluster to be initialized.
-   * 
-   * @throws UnsupportedOperationException
-   *           if the passed cluster type is not MAPRED or HDFS
-   */
-  private void populateScriptNames(ClusterType t) {
-    switch (t) {
-    case MAPRED:
-      masterCommand = "jobtracker";
-      slaveCommand = "tasktracker";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Created mapred hadoop daemon remote cluster manager with "
-            + "scriptName: mapred, masterCommand: jobtracker, "
-            + "slaveCommand: tasktracker");
-      }
-      break;
-    case HDFS:
-      masterCommand = "namenode";
-      slaveCommand = "datanode";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Created hdfs hadoop daemon remote cluster manager with "
-            + "scriptName: hdfs, masterCommand: namenode, "
-            + "slaveCommand: datanode");
-      }
-      break;
-    default:
-      LOG.error("Cluster type :" + t
-          + "is not supported currently by HadoopDaemonRemoteCluster");
-      throw new UnsupportedOperationException(
-          "The specified cluster type is not supported by the " +
-          "HadoopDaemonRemoteCluster");
-    }
+  @Override
+  public List<RemoteProcess> getAllProcesses() {
+    return processes;
+  }
+
+  @Override
+  public Set<Enum<?>> getRoles() {
+    return roles;
   }
 
   /**
@@ -116,14 +103,14 @@ public class HadoopDaemonRemoteCluster i
    * 
    * @param conf
    *          Configuration object containing values for
-   *          TEST_SYSTEM_HADOOPHOME_CONF_KEY and
-   *          TEST_SYSTEM_HADOOPCONFDIR_CONF_KEY
+   *          CONF_HADOOPHOME and
+   *          CONF_HADOOPCONFDIR
    * 
    * @throws IllegalArgumentException
    *           if the configuration or system property set does not contain
    *           values for the required keys.
    */
-  private void populateDirectories(Configuration conf) {
+  protected void populateDirectories(Configuration conf) {
     hadoopHome = conf.get(CONF_HADOOPHOME, System
         .getProperty(CONF_HADOOPHOME));
     hadoopConfDir = conf.get(CONF_HADOOPCONFDIR, System
@@ -147,69 +134,57 @@ public class HadoopDaemonRemoteCluster i
   }
 
   @Override
-  public RemoteProcess getMaster() {
-    return master;
-  }
-
-  @Override
-  public Map<String, RemoteProcess> getSlaves() {
-    return slaves;
-  }
-
-  @Override
   public void start() throws IOException {
-    // start master first.
-    master.start();
-    for (RemoteProcess slave : slaves.values()) {
-      slave.start();
+    for (RemoteProcess process : processes) {
+      process.start();
     }
   }
 
   @Override
   public void stop() throws IOException {
-    master.kill();
-    for (RemoteProcess slave : slaves.values()) {
-      slave.kill();
+    for (RemoteProcess process : processes) {
+      process.kill();
     }
   }
 
-  private void populateDaemons(String confLocation) throws IOException {
-    File mastersFile = new File(confLocation, "masters");
-    File slavesFile = new File(confLocation, "slaves");
+  protected void populateDaemon(String confLocation, 
+      HadoopDaemonInfo info) throws IOException {
+    File hostFile = new File(confLocation, info.hostFile);
     BufferedReader reader = null;
+    reader = new BufferedReader(new FileReader(hostFile));
+    String host = null;
     try {
-      reader = new BufferedReader(new FileReader(mastersFile));
-      String masterHost = null;
-      masterHost = reader.readLine();
-      if (masterHost != null && !masterHost.trim().isEmpty()) {
-        master = new ScriptDaemon(masterCommand, masterHost);
+      boolean foundAtLeastOne = false;
+      while ((host = reader.readLine()) != null) {
+        if (host.trim().isEmpty()) {
+          throw new IllegalArgumentException(
+          "Hostname could not be found in file " + info.hostFile);
+        }
+        InetAddress addr = InetAddress.getByName(host);
+        RemoteProcess process = new ScriptDaemon(info.cmd, 
+            addr.getCanonicalHostName(), info.role);
+        processes.add(process);
+        foundAtLeastOne = true;
+      }
+      if (!foundAtLeastOne) {
+        throw new IllegalArgumentException("Alteast one hostname " +
+          "is required to be present in file - " + info.hostFile);
       }
     } finally {
       try {
         reader.close();
       } catch (Exception e) {
-        LOG.error("Can't read masters file from " + confLocation);
-      }
-
-    }
-    try {
-      reader = new BufferedReader(new FileReader(slavesFile));
-      String slaveHost = null;
-      while ((slaveHost = reader.readLine()) != null) {
-        InetAddress addr = InetAddress.getByName(slaveHost);
-        RemoteProcess slave = new ScriptDaemon(slaveCommand, 
-            addr.getCanonicalHostName());
-        slaves.put(addr.getCanonicalHostName(), slave);
-      }
-    } finally {
-      try {
-        reader.close();
-      } catch (Exception e) {
-        LOG.error("Can't read slaves file from " + confLocation);
+        LOG.warn("Could not close reader");
       }
     }
   }
 
+  protected void populateDaemons(String confLocation) throws IOException {
+   for (HadoopDaemonInfo info : daemonInfos) {
+     populateDaemon(confLocation, info);
+   }
+  }
+
   /**
    * The core daemon class which actually implements the remote process
    * management of actual daemon processes in the cluster.
@@ -222,10 +197,12 @@ public class HadoopDaemonRemoteCluster i
     private static final String SCRIPT_NAME = "hadoop-daemon.sh";
     private final String daemonName;
     private final String hostName;
+    private final Enum<?> role;
 
-    public ScriptDaemon(String daemonName, String hostName) {
+    public ScriptDaemon(String daemonName, String hostName, Enum<?> role) {
       this.daemonName = daemonName;
       this.hostName = hostName;
+      this.role = role;
     }
 
     @Override
@@ -272,6 +249,10 @@ public class HadoopDaemonRemoteCluster i
     public void start() throws IOException {
       buildCommandExecutor(START_COMMAND).execute();
     }
-  }
 
+    @Override
+    public Enum<?> getRole() {
+      return role;
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java Fri Mar  4 03:57:38 2011
@@ -26,4 +26,11 @@ public interface RemoteProcess {
    * @throws IOException if shutdown fails.
    */
   void kill() throws IOException;
+
+  /**
+   * Get the role of the Daemon in the cluster.
+   * 
+   * @return Enum
+   */
+  Enum<?> getRole();
 }
\ No newline at end of file