You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:57:38 UTC
svn commit: r1077263 - in
/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop:
mapred/ mapreduce/test/system/ test/system/ test/system/process/
Author: omalley
Date: Fri Mar 4 03:57:38 2011
New Revision: 1077263
URL: http://svn.apache.org/viewvc?rev=1077263&view=rev
Log:
commit 60fd4d56096ea4e5a37e0f3b48223a59ec450eae
Author: Sharad Agarwal <sh...@yahoo-inc.com>
Date: Tue Mar 2 10:45:50 2010 +0530
patch from
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java
Removed:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java Fri Mar 4 03:57:38 2011
@@ -19,7 +19,6 @@ import org.apache.hadoop.mapreduce.test.
import org.apache.hadoop.mapreduce.test.system.TTInfo;
import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
import org.apache.hadoop.mapreduce.test.system.TaskInfo;
-import org.apache.hadoop.net.NetUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -47,35 +46,33 @@ public class TestCluster {
@Test
public void testProcessInfo() throws Exception {
- LOG.info("Process info of master is : "
- + cluster.getMaster().getProcessInfo());
- Assert.assertNotNull(cluster.getMaster().getProcessInfo());
- Collection<TTClient> slaves = cluster.getSlaves().values();
- for (TTClient slave : slaves) {
- LOG.info("Process info of slave is : " + slave.getProcessInfo());
- Assert.assertNotNull(slave.getProcessInfo());
+ LOG.info("Process info of JobTracker is : "
+ + cluster.getJTClient().getProcessInfo());
+ Assert.assertNotNull(cluster.getJTClient().getProcessInfo());
+ Collection<TTClient> tts = cluster.getTTClients();
+ for (TTClient tt : tts) {
+ LOG.info("Process info of TaskTracker is : " + tt.getProcessInfo());
+ Assert.assertNotNull(tt.getProcessInfo());
}
}
@Test
public void testJobSubmission() throws Exception {
Configuration conf = new Configuration(cluster.getConf());
- JTProtocol wovenClient = cluster.getMaster().getProxy();
- JobInfo[] jobs = wovenClient.getAllJobInfo();
SleepJob job = new SleepJob();
job.setConf(conf);
conf = job.setupJobConf(1, 1, 100, 100, 100, 100);
- RunningJob rJob = cluster.getMaster().submitAndVerifyJob(conf);
- cluster.getMaster().verifyJobHistory(rJob.getID());
+ RunningJob rJob = cluster.getJTClient().submitAndVerifyJob(conf);
+ cluster.getJTClient().verifyJobHistory(rJob.getID());
}
@Test
public void testFileStatus() throws Exception {
- JTClient jt = cluster.getMaster();
+ JTClient jt = cluster.getJTClient();
String dir = ".";
checkFileStatus(jt.getFileStatus(dir, true));
checkFileStatus(jt.listStatus(dir, false, true), dir);
- for (TTClient tt : cluster.getSlaves().values()) {
+ for (TTClient tt : cluster.getTTClients()) {
String[] localDirs = tt.getMapredLocalDirs();
for (String localDir : localDirs) {
checkFileStatus(tt.listStatus(localDir, true, false), localDir);
@@ -118,13 +115,13 @@ public class TestCluster {
@Test
public void testTaskStatus() throws Exception {
Configuration conf = new Configuration(cluster.getConf());
- JTProtocol wovenClient = cluster.getMaster().getProxy();
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
FinishTaskControlAction.configureControlActionForJob(conf);
SleepJob job = new SleepJob();
job.setConf(conf);
conf = job.setupJobConf(1, 0, 100, 100, 100, 100);
- JobClient client = cluster.getMaster().getClient();
+ JobClient client = cluster.getJTClient().getClient();
RunningJob rJob = client.submitJob(new JobConf(conf));
JobID id = rJob.getID();
@@ -144,8 +141,7 @@ public class TestCluster {
String[] taskTrackers = info.getTaskTrackers();
for(String taskTracker : taskTrackers) {
TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
- TTClient ttCli = cluster.getSlaves().get(
- ttInfo.getStatus().getHost());
+ TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost());
TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(info.getTaskID());
Assert.assertNotNull(ttTaskInfo);
FinishTaskControlAction action = new FinishTaskControlAction(
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java Fri Mar 4 03:57:38 2011
@@ -40,13 +40,13 @@ public class TestControlledJob {
@Test
public void testControlledJob() throws Exception {
Configuration conf = new Configuration(cluster.getConf());
- JTProtocol wovenClient = cluster.getMaster().getProxy();
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
FinishTaskControlAction.configureControlActionForJob(conf);
SleepJob job = new SleepJob();
job.setConf(conf);
conf = job.setupJobConf(1, 0, 100, 100, 100, 100);
- JobClient client = cluster.getMaster().getClient();
+ JobClient client = cluster.getJTClient().getClient();
RunningJob rJob = client.submitJob(new JobConf(conf));
JobID id = rJob.getID();
@@ -74,7 +74,7 @@ public class TestControlledJob {
LOG.info("constructing control action to signal task to finish");
FinishTaskControlAction action = new FinishTaskControlAction(
TaskID.downgrade(info.getTaskID()));
- for(TTClient cli : cluster.getSlaves().values()) {
+ for(TTClient cli : cluster.getTTClients()) {
cli.getProxy().sendAction(action);
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java Fri Mar 4 03:57:38 2011
@@ -64,7 +64,7 @@ public class TestSortValidate {
@Before
public void setUp() throws java.lang.Exception {
cluster.setUp();
- client = cluster.getMaster().getClient();
+ client = cluster.getJTClient().getClient();
dfs = client.getFs();
dfs.delete(SORT_INPUT_PATH, true);
@@ -97,7 +97,7 @@ public class TestSortValidate {
int prevJobsNum = 0;
// JTProtocol wovenClient
- JTProtocol wovenClient = cluster.getMaster().getProxy();
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
// JobStatus
JobStatus[] jobStatus = null;
@@ -141,7 +141,7 @@ public class TestSortValidate {
jInfo = wovenClient.getJobInfo(id);
}
- cluster.getMaster().verifyCompletedJob(id);
+ cluster.getJTClient().verifyCompletedJob(id);
}
private void runSort(Configuration job, Path sortInput, Path sortOutput)
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java Fri Mar 4 03:57:38 2011
@@ -54,7 +54,7 @@ public class TestTaskOwner {
cluster = MRCluster.createCluster(new Configuration());
cluster.setUp();
- FileSystem fs = inDir.getFileSystem(cluster.getMaster().getConf());
+ FileSystem fs = inDir.getFileSystem(cluster.getJTClient().getConf());
fs.create(inDir);
}
@@ -65,7 +65,7 @@ public class TestTaskOwner {
// in the cluster and we will authenticate whether matches
// with the job that is submitted by the same user.
- Configuration conf = cluster.getMaster().getConf();
+ Configuration conf = cluster.getJTClient().getConf();
Job job = new Job(conf, "user name check");
job.setJarByClass(UserNamePermission.class);
@@ -119,7 +119,7 @@ public class TestTaskOwner {
@AfterClass
public static void tearDown() throws java.lang.Exception {
- FileSystem fs = outDir.getFileSystem(cluster.getMaster().getConf());
+ FileSystem fs = outDir.getFileSystem(cluster.getJTClient().getConf());
fs.delete(outDir, true);
cluster.tearDown();
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar 4 03:57:38 2011
@@ -1,78 +1,118 @@
package org.apache.hadoop.mapreduce.test.system;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.test.system.AbstractMasterSlaveCluster;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.AbstractDaemonCluster;
import org.apache.hadoop.test.system.process.ClusterProcessManager;
-import org.apache.hadoop.test.system.process.ClusterProcessManagerFactory;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
import org.apache.hadoop.test.system.process.RemoteProcess;
-import org.apache.hadoop.test.system.process.ClusterProcessManager.ClusterType;
/**
- * Concrete MasterSlaveCluster representing a Map-Reduce cluster.
+ * Concrete AbstractDaemonCluster representing a Map-Reduce cluster.
*
*/
-public class MRCluster extends AbstractMasterSlaveCluster<JTClient,
- TTClient> {
+@SuppressWarnings("unchecked")
+public class MRCluster extends AbstractDaemonCluster {
private static final Log LOG = LogFactory.getLog(MRCluster.class);
+ public static final String CLUSTER_PROCESS_MGR_IMPL =
+ "test.system.mr.clusterprocess.impl.class";
+ protected enum Role {JT, TT};
+
private MRCluster(Configuration conf, ClusterProcessManager rCluster)
throws IOException {
super(conf, rCluster);
}
/**
- * Creates an instance of the Map-Reduce cluster.<br/>
- * Example usage: <br/>
- * <code>
- * Configuration conf = new Configuration();<br/>
- * conf.set(ClusterProcessManager.IMPL_CLASS,
- * org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.
- * class.getName())<br/>
- * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPHOME,
- * "/path");<br/>
- * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPCONFDIR,
- * "/path");<br/>
- * MRCluster cluster = MRCluster.createCluster(conf);
- * </code>
+ * Factory method to create an instance of the Map-Reduce cluster.<br/>
*
* @param conf
* contains all required parameter to create cluster.
* @return a cluster instance to be managed.
- * @throws IOException
* @throws Exception
*/
public static MRCluster createCluster(Configuration conf)
- throws IOException, Exception {
- return new MRCluster(conf, ClusterProcessManagerFactory.createInstance(
- ClusterType.MAPRED, conf));
+ throws Exception {
+ String implKlass = conf.get(CLUSTER_PROCESS_MGR_IMPL, System
+ .getProperty(CLUSTER_PROCESS_MGR_IMPL));
+ if (implKlass == null || implKlass.isEmpty()) {
+ implKlass = MRProcessManager.class.getName();
+ }
+ Class<ClusterProcessManager> klass = (Class<ClusterProcessManager>) Class
+ .forName(implKlass);
+ ClusterProcessManager clusterProcessMgr = klass.newInstance();
+ LOG.info("Created ClusterProcessManager as " + implKlass);
+ clusterProcessMgr.init(conf);
+ return new MRCluster(conf, clusterProcessMgr);
}
- @Override
- protected JTClient createMaster(RemoteProcess masterDaemon)
+ protected JTClient createJTClient(RemoteProcess jtDaemon)
throws IOException {
- return new JTClient(getConf(), masterDaemon);
+ return new JTClient(getConf(), jtDaemon);
}
- @Override
- protected TTClient createSlave(RemoteProcess slaveDaemon)
+ protected TTClient createTTClient(RemoteProcess ttDaemon)
throws IOException {
- return new TTClient(getConf(), slaveDaemon);
+ return new TTClient(getConf(), ttDaemon);
+ }
+
+ public JTClient getJTClient() {
+ Iterator<AbstractDaemonClient> it = getDaemons().get(Role.JT).iterator();
+ return (JTClient) it.next();
+ }
+
+ public List<TTClient> getTTClients() {
+ return (List) getDaemons().get(Role.TT);
+ }
+
+ public TTClient getTTClient(String hostname) {
+ for (TTClient c : getTTClients()) {
+ if (c.getHostName().equals(hostname)) {
+ return c;
+ }
+ }
+ return null;
}
@Override
public void ensureClean() throws IOException {
//TODO: ensure that no jobs/tasks are running
//restart the cluster if cleanup fails
- JTClient jtClient = getMaster();
+ JTClient jtClient = getJTClient();
JobInfo[] jobs = jtClient.getProxy().getAllJobInfo();
for(JobInfo job : jobs) {
jtClient.getClient().killJob(
org.apache.hadoop.mapred.JobID.downgrade(job.getID()));
}
}
+
+ @Override
+ protected AbstractDaemonClient createClient(
+ RemoteProcess process) throws IOException {
+ if (Role.JT.equals(process.getRole())) {
+ return createJTClient(process);
+ } else if (Role.TT.equals(process.getRole())) {
+ return createTTClient(process);
+ } else throw new IOException("Role: "+ process.getRole() + " is not " +
+ "applicable to MRCluster");
+ }
+
+ public static class MRProcessManager extends HadoopDaemonRemoteCluster{
+ private static final List<HadoopDaemonInfo> mrDaemonInfos =
+ Arrays.asList(new HadoopDaemonInfo[]{
+ new HadoopDaemonInfo("jobtracker", Role.JT, "masters"),
+ new HadoopDaemonInfo("tasktracker", Role.TT, "slaves")});
+ public MRProcessManager() {
+ super(mrDaemonInfos);
+ }
+ }
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java?rev=1077263&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java Fri Mar 4 03:57:38 2011
@@ -0,0 +1,223 @@
+package org.apache.hadoop.test.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Abstract class which represent the cluster having multiple daemons.
+ */
+@SuppressWarnings("unchecked")
+public abstract class AbstractDaemonCluster {
+
+ private static final Log LOG = LogFactory.getLog(AbstractDaemonCluster.class);
+
+ private Configuration conf;
+ protected ClusterProcessManager clusterManager;
+ private Map<Enum<?>, List<AbstractDaemonClient>> daemons =
+ new LinkedHashMap<Enum<?>, List<AbstractDaemonClient>>();
+
+ /**
+ * Constructor to create a cluster client.<br/>
+ *
+ * @param conf
+ * Configuration to be used while constructing the cluster.
+ * @param rcluster
+ * process manger instance to be used for managing the daemons.
+ *
+ * @throws IOException
+ */
+ public AbstractDaemonCluster(Configuration conf,
+ ClusterProcessManager rcluster) throws IOException {
+ this.conf = conf;
+ this.clusterManager = rcluster;
+ createAllClients();
+ }
+
+ protected void createAllClients() throws IOException {
+ for (RemoteProcess p : clusterManager.getAllProcesses()) {
+ List<AbstractDaemonClient> dms = daemons.get(p.getRole());
+ if (dms == null) {
+ dms = new ArrayList<AbstractDaemonClient>();
+ daemons.put(p.getRole(), dms);
+ }
+ dms.add(createClient(p));
+ }
+ }
+
+ /**
+ * Method to create the daemon client.<br/>
+ *
+ * @param remoteprocess
+ * to manage the daemon.
+ * @return instance of the daemon client
+ *
+ * @throws IOException
+ */
+ protected abstract AbstractDaemonClient<DaemonProtocol>
+ createClient(RemoteProcess process) throws IOException;
+
+ /**
+ * Get the global cluster configuration which was used to create the
+ * cluster. <br/>
+ *
+ * @return global configuration of the cluster.
+ */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ *
+
+ /**
+ * Return the client handle of all the Daemons.<br/>
+ *
+ * @return map of role to daemon clients' list.
+ */
+ public Map<Enum<?>, List<AbstractDaemonClient>> getDaemons() {
+ return daemons;
+ }
+
+ /**
+ * Checks if the cluster is ready for testing. <br/>
+ * Algorithm for checking is as follows : <br/>
+ * <ul>
+ * <li> Wait for Daemon to come up </li>
+ * <li> Check if daemon is ready </li>
+ * <li> If one of the daemon is not ready, return false </li>
+ * </ul>
+ *
+ * @return true if whole cluster is ready.
+ *
+ * @throws IOException
+ */
+ public boolean isReady() throws IOException {
+ for (List<AbstractDaemonClient> set : daemons.values()) {
+ for (AbstractDaemonClient daemon : set) {
+ waitForDaemon(daemon);
+ if (!daemon.isReady()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ protected void waitForDaemon(AbstractDaemonClient d) {
+ while(true) {
+ try {
+ LOG.info("Waiting for daemon in host to come up : " + d.getHostName());
+ d.connect();
+ break;
+ } catch (IOException e) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+
+ /**
+ * Starts the cluster daemons.
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ clusterManager.start();
+ }
+
+ /**
+ * Stops the cluster daemons.
+ * @throws IOException
+ */
+ public void stop() throws IOException {
+ clusterManager.stop();
+ }
+
+ /**
+ * Connect to daemon RPC ports.
+ * @throws IOException
+ */
+ public void connect() throws IOException {
+ for (List<AbstractDaemonClient> set : daemons.values()) {
+ for (AbstractDaemonClient daemon : set) {
+ daemon.connect();
+ }
+ }
+ }
+
+ /**
+ * Disconnect to daemon RPC ports.
+ * @throws IOException
+ */
+ public void disconnect() throws IOException {
+ for (List<AbstractDaemonClient> set : daemons.values()) {
+ for (AbstractDaemonClient daemon : set) {
+ daemon.disconnect();
+ }
+ }
+ }
+
+ /**
+ * Ping all the daemons of the cluster.
+ * @throws IOException
+ */
+ public void ping() throws IOException {
+ for (List<AbstractDaemonClient> set : daemons.values()) {
+ for (AbstractDaemonClient daemon : set) {
+ LOG.info("Daemon is : " + daemon.getHostName() + " pinging....");
+ daemon.ping();
+ }
+ }
+ }
+
+ /**
+ * Connect to the cluster and ensure that it is clean to run tests.
+ * @throws Exception
+ */
+ public void setUp() throws Exception {
+ while (!isReady()) {
+ Thread.sleep(1000);
+ }
+ connect();
+ ping();
+ clearAllControlActions();
+ ensureClean();
+ }
+
+ public void clearAllControlActions() throws IOException {
+ for (List<AbstractDaemonClient> set : daemons.values()) {
+ for (AbstractDaemonClient daemon : set) {
+ LOG.info("Daemon is : " + daemon.getHostName() + " pinging....");
+ daemon.getProxy().clearActions();
+ }
+ }
+ }
+
+ /**
+ * Ensure that the cluster is clean to run tests.
+ * @throws IOException
+ */
+ public void ensureClean() throws IOException {
+ }
+
+ /**
+ * Ensure that cluster is clean. Disconnect from the RPC ports of the daemons.
+ * @throws IOException
+ */
+ public void tearDown() throws IOException {
+ ensureClean();
+ clearAllControlActions();
+ disconnect();
+ }
+}
+
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java Fri Mar 4 03:57:38 2011
@@ -1,68 +1,48 @@
package org.apache.hadoop.test.system.process;
import java.io.IOException;
-import java.util.Map;
+import java.util.List;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
/**
- * Interface to manage the remote processes in the master-slave cluster.
+ * Interface to manage the remote processes in the cluster.
*/
public interface ClusterProcessManager {
/**
- * The configuration key to specify the concrete implementation of the
- * {@link ClusterProcessManager} to be used by
- * {@link ClusterProcessManagerFactory}.
- */
- String IMPL_CLASS = "test.system.clusterprocessmanager.impl.class";
-
- /**
- * Enumeration used to specify the types of the clusters which are supported
- * by the concrete implementations of {@link ClusterProcessManager}.
- */
- public enum ClusterType {
- MAPRED, HDFS
- }
-
- /**
- * Initialization method to set cluster type and also pass the configuration
- * object which is required by the ClusterProcessManager to manage the
- * cluster.<br/>
+ * Initialization method to pass the configuration object which is required
+ * by the ClusterProcessManager to manage the cluster.<br/>
* Configuration object should typically contain all the parameters which are
* required by the implementations.<br/>
*
- * @param t type of the cluster to be managed.
* @param conf configuration containing values of the specific keys which
* are required by the implementation of the cluster process manger.
*
- * @throws Exception when initialization fails.
+ * @throws IOException when initialization fails.
*/
- void init(ClusterType t, Configuration conf) throws Exception;
+ void init(Configuration conf) throws IOException;
/**
- * Getter for master daemon process for managing the master daemon.<br/>
- *
- * @return master daemon process.
+ * Get the list of RemoteProcess handles of all the remote processes.
*/
- RemoteProcess getMaster();
+ List<RemoteProcess> getAllProcesses();
/**
- * Getter for slave daemon process for managing the slaves.<br/>
- *
- * @return map of slave hosts to slave daemon process.
+ * Get all the roles this cluster's daemon processes have.
*/
- Map<String, RemoteProcess> getSlaves();
+ Set<Enum<?>> getRoles();
/**
- * Method to start the cluster including all master and slaves.<br/>
+ * Method to start all the remote daemons.<br/>
*
* @throws IOException if startup procedure fails.
*/
void start() throws IOException;
/**
- * Method to shutdown all the master and slaves.<br/>
+ * Method to shutdown all the remote daemons.<br/>
*
* @throws IOException if shutdown procedure fails.
*/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java Fri Mar 4 03:57:38 2011
@@ -7,7 +7,9 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -26,11 +28,12 @@ import org.apache.hadoop.util.Shell.Shel
* Following will be the format which the final command execution would look :
* <br/>
* <code>
- * ssh master-host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName
- * --config HADOOP_CONF_DIR (start|stop) masterCommand'
+ * ssh host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName
+ * --config HADOOP_CONF_DIR (start|stop) command'
* </code>
*/
-public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
+public abstract class HadoopDaemonRemoteCluster
+ implements ClusterProcessManager {
private static final Log LOG = LogFactory
.getLog(HadoopDaemonRemoteCluster.class.getName());
@@ -53,62 +56,46 @@ public class HadoopDaemonRemoteCluster i
private String hadoopHome;
private String hadoopConfDir;
private String deployed_hadoopConfDir;
- private String masterCommand;
- private String slaveCommand;
+ private final Set<Enum<?>> roles;
- private RemoteProcess master;
- private Map<String, RemoteProcess> slaves;
+ private final List<HadoopDaemonInfo> daemonInfos;
+ private List<RemoteProcess> processes;
+
+ public static class HadoopDaemonInfo {
+ public final String cmd;
+ public final Enum<?> role;
+ public final String hostFile;
+ public HadoopDaemonInfo(String cmd, Enum<?> role, String hostFile) {
+ super();
+ this.cmd = cmd;
+ this.role = role;
+ this.hostFile = hostFile;
+ }
+ }
+
+ public HadoopDaemonRemoteCluster(List<HadoopDaemonInfo> daemonInfos) {
+ this.daemonInfos = daemonInfos;
+ this.roles = new HashSet<Enum<?>>();
+ for (HadoopDaemonInfo info : daemonInfos) {
+ this.roles.add(info.role);
+ }
+ }
@Override
- public void init(ClusterType t, Configuration conf) throws Exception {
- /*
- * Initialization strategy of the HadoopDaemonRemoteCluster is three staged
- * process: 1. Populate script names based on the type of passed cluster. 2.
- * Populate the required directories. 3. Populate the master and slaves.
- */
- populateScriptNames(t);
+ public void init(Configuration conf) throws IOException {
populateDirectories(conf);
- this.slaves = new HashMap<String, RemoteProcess>();
+ this.processes = new ArrayList<RemoteProcess>();
populateDaemons(deployed_hadoopConfDir);
}
- /**
- * Method to populate the required master and slave commands which are used to
- * manage the cluster.<br/>
- *
- * @param t
- * type of cluster to be initialized.
- *
- * @throws UnsupportedOperationException
- * if the passed cluster type is not MAPRED or HDFS
- */
- private void populateScriptNames(ClusterType t) {
- switch (t) {
- case MAPRED:
- masterCommand = "jobtracker";
- slaveCommand = "tasktracker";
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created mapred hadoop daemon remote cluster manager with "
- + "scriptName: mapred, masterCommand: jobtracker, "
- + "slaveCommand: tasktracker");
- }
- break;
- case HDFS:
- masterCommand = "namenode";
- slaveCommand = "datanode";
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created hdfs hadoop daemon remote cluster manager with "
- + "scriptName: hdfs, masterCommand: namenode, "
- + "slaveCommand: datanode");
- }
- break;
- default:
- LOG.error("Cluster type :" + t
- + "is not supported currently by HadoopDaemonRemoteCluster");
- throw new UnsupportedOperationException(
- "The specified cluster type is not supported by the " +
- "HadoopDaemonRemoteCluster");
- }
+ @Override
+ public List<RemoteProcess> getAllProcesses() {
+ return processes;
+ }
+
+ @Override
+ public Set<Enum<?>> getRoles() {
+ return roles;
}
/**
@@ -116,14 +103,14 @@ public class HadoopDaemonRemoteCluster i
*
* @param conf
* Configuration object containing values for
- * TEST_SYSTEM_HADOOPHOME_CONF_KEY and
- * TEST_SYSTEM_HADOOPCONFDIR_CONF_KEY
+ * CONF_HADOOPHOME and
+ * CONF_HADOOPCONFDIR
*
* @throws IllegalArgumentException
* if the configuration or system property set does not contain
* values for the required keys.
*/
- private void populateDirectories(Configuration conf) {
+ protected void populateDirectories(Configuration conf) {
hadoopHome = conf.get(CONF_HADOOPHOME, System
.getProperty(CONF_HADOOPHOME));
hadoopConfDir = conf.get(CONF_HADOOPCONFDIR, System
@@ -147,69 +134,57 @@ public class HadoopDaemonRemoteCluster i
}
@Override
- public RemoteProcess getMaster() {
- return master;
- }
-
- @Override
- public Map<String, RemoteProcess> getSlaves() {
- return slaves;
- }
-
- @Override
public void start() throws IOException {
- // start master first.
- master.start();
- for (RemoteProcess slave : slaves.values()) {
- slave.start();
+ for (RemoteProcess process : processes) {
+ process.start();
}
}
@Override
public void stop() throws IOException {
- master.kill();
- for (RemoteProcess slave : slaves.values()) {
- slave.kill();
+ for (RemoteProcess process : processes) {
+ process.kill();
}
}
- private void populateDaemons(String confLocation) throws IOException {
- File mastersFile = new File(confLocation, "masters");
- File slavesFile = new File(confLocation, "slaves");
+ protected void populateDaemon(String confLocation,
+ HadoopDaemonInfo info) throws IOException {
+ File hostFile = new File(confLocation, info.hostFile);
BufferedReader reader = null;
+ reader = new BufferedReader(new FileReader(hostFile));
+ String host = null;
try {
- reader = new BufferedReader(new FileReader(mastersFile));
- String masterHost = null;
- masterHost = reader.readLine();
- if (masterHost != null && !masterHost.trim().isEmpty()) {
- master = new ScriptDaemon(masterCommand, masterHost);
+ boolean foundAtLeastOne = false;
+ while ((host = reader.readLine()) != null) {
+ if (host.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Hostname could not be found in file " + info.hostFile);
+ }
+ InetAddress addr = InetAddress.getByName(host);
+ RemoteProcess process = new ScriptDaemon(info.cmd,
+ addr.getCanonicalHostName(), info.role);
+ processes.add(process);
+ foundAtLeastOne = true;
+ }
+ if (!foundAtLeastOne) {
+ throw new IllegalArgumentException("Alteast one hostname " +
+ "is required to be present in file - " + info.hostFile);
}
} finally {
try {
reader.close();
} catch (Exception e) {
- LOG.error("Can't read masters file from " + confLocation);
- }
-
- }
- try {
- reader = new BufferedReader(new FileReader(slavesFile));
- String slaveHost = null;
- while ((slaveHost = reader.readLine()) != null) {
- InetAddress addr = InetAddress.getByName(slaveHost);
- RemoteProcess slave = new ScriptDaemon(slaveCommand,
- addr.getCanonicalHostName());
- slaves.put(addr.getCanonicalHostName(), slave);
- }
- } finally {
- try {
- reader.close();
- } catch (Exception e) {
- LOG.error("Can't read slaves file from " + confLocation);
+ LOG.warn("Could not close reader");
}
}
}
+ protected void populateDaemons(String confLocation) throws IOException {
+ for (HadoopDaemonInfo info : daemonInfos) {
+ populateDaemon(confLocation, info);
+ }
+ }
+
/**
* The core daemon class which actually implements the remote process
* management of actual daemon processes in the cluster.
@@ -222,10 +197,12 @@ public class HadoopDaemonRemoteCluster i
private static final String SCRIPT_NAME = "hadoop-daemon.sh";
private final String daemonName;
private final String hostName;
+ private final Enum<?> role;
- public ScriptDaemon(String daemonName, String hostName) {
+ public ScriptDaemon(String daemonName, String hostName, Enum<?> role) {
this.daemonName = daemonName;
this.hostName = hostName;
+ this.role = role;
}
@Override
@@ -272,6 +249,10 @@ public class HadoopDaemonRemoteCluster i
public void start() throws IOException {
buildCommandExecutor(START_COMMAND).execute();
}
- }
+ @Override
+ public Enum<?> getRole() {
+ return role;
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java?rev=1077263&r1=1077262&r2=1077263&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java Fri Mar 4 03:57:38 2011
@@ -26,4 +26,11 @@ public interface RemoteProcess {
* @throws IOException if shutdown fails.
*/
void kill() throws IOException;
+
+ /**
+ * Get the role of the Daemon in the cluster.
+ *
+ * @return Enum
+ */
+ Enum<?> getRole();
}
\ No newline at end of file