You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2010/06/21 21:02:51 UTC

svn commit: r956666 [2/4] - in /hadoop/mapreduce/trunk: ./ ivy/ src/test/aop/build/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/testjar/ src/test/system/ src/test/system/aop/ src/test/system/aop/org/ src/test/system/aop/org/apache/ src/te...

Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj (added)
+++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj Mon Jun 21 19:02:49 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+
+public privileged aspect ClusterAspect {
+
+  public ClientProtocol Cluster.getClientProtocol() {
+    return client;
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml (added)
+++ hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml Mon Jun 21 19:02:49 2010
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+<!-- Mandatory properties that are to be set and uncommented before running the tests -->
+
+<property>
+  <name>test.system.hdrc.hadoophome</name>
+  <value>$(TO_DO_HADOOP_INSTALL)/share/hadoop-current</value>
+  <description> This is the path to the home directory of the hadoop deployment.
+  </description>
+</property>
+<property>
+  <name>test.system.hdrc.hadoopconfdir</name>
+  <value>$(TO_DO_HADOOP_INSTALL)/conf/hadoop</value>
+  <description> This is the path to the configuration directory of the hadoop
+  cluster that is deployed.
+  </description>
+</property>
+
+<property>
+  <name>test.system.hdrc.tt.hostfile</name>
+  <value>slaves.localcopy.txt</value>
+  <description> File name containing the hostnames where the TaskTrackers are running.
+  </description>
+</property>
+
+<property>
+  <name>test.system.mr.clusterprocess.impl.class</name>
+  <value>org.apache.hadoop.mapreduce.test.system.MRCluster$MRProcessManager</value>
+  <description>
+  Cluster process manager for the Mapreduce subsystem of the cluster. The value
+  org.apache.hadoop.mapreduce.test.system.MRCluster$MultiMRProcessManager can
+  be used to enable multi user support.
+  </description>
+</property>
+
+<property>
+   <name>test.system.hdrc.deployed.scripts.dir</name>
+   <value>./src/test/system/scripts</value>
+   <description>
+     This directory hosts the scripts in the deployed location where
+     the system test client runs.
+   </description>
+</property>
+
+<property>
+  <name>test.system.hdrc.hadoopnewconfdir</name>
+  <value>$(TO_DO_GLOBAL_TMP_DIR)/newconf</value>
+  <description>
+  The directory where the new config files will be copied to in all
+  the clusters is pointed out this directory. 
+  </description>
+</property>
+
+<!-- Mandatory keys to be set for the multi user support to be enabled.  -->
+
+ <property>
+  <name>test.system.mr.clusterprocess.impl.class</name>
+  <value>org.apache.hadoop.mapreduce.test.system.MRCluster$MultiMRProcessManager</value>
+  <description>
+    Enabling multi user based cluster process manger.
+  </description>
+</property>
+
+<property>
+  <name>test.system.hdrc.multi-user.binary.path</name>
+  <value>$(TO_DO_HADOOP_INSTALL)/conf/hadoop/runAs</value>
+  <description>
+    Local file system path on gate way to cluster-controller binary including the binary name.
+    To build the binary the following commands need to be executed:
+     % ant run-as -Drun-as.hadoop.home.dir=(HADOOP_HOME of setup cluster)
+     % cp build-fi/system/c++-build/runAs test.system.hdrc.multi-user.binary.path
+    Location of the cluster is important security precaution.
+    The binary should be owned by root and test user group permission should be set such a
+    way that it can be executed by binary. Example usage would be:
+     % sudo chown root binary
+     % sudo chmod 6511 binary
+    Change permission appropriately to make it more secure.
+  </description>
+</property>
+
+<property>
+  <name>test.system.hdrc.multi-user.managinguser.jobtracker</name>
+  <value>*</value>
+  <description>
+    User value for managing the particular daemon, please note that these user should be
+    present on gateways also, an example configuration for the above would be 
+    key name = test.system.hdrc.multi-user.managinguser.jobtracker
+    key value = guest
+    Please note the daemon names are all lower case, corresponding to hadoop-daemon.sh command.
+  </description>
+</property>
+<property>
+  <name>test.system.hdrc.multi-user.managinguser.tasktracker</name>
+  <value>*</value>
+</property>
+ 
+</configuration>

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+
+/**
+ * Concrete implementation of the JobInfo interface which is exposed to the
+ * clients.
+ * Look at {@link JobInfo} for further details.
+ */
+class JobInfoImpl implements JobInfo {
+
+  private List<String> blackListedTracker;
+  private String historyUrl;
+  private JobID id;
+  private boolean setupLaunched;
+  private boolean setupFinished;
+  private boolean cleanupLaunched;
+  private JobStatus status;
+  private int runningMaps;
+  private int runningReduces;
+  private int waitingMaps;
+  private int waitingReduces;
+  private int finishedMaps;
+  private int finishedReduces;
+  private int numMaps;
+  private int numReduces;
+  private boolean historyCopied;
+
+  public JobInfoImpl() {
+    id = new JobID();
+    status = new JobStatus();
+    blackListedTracker = new LinkedList<String>();
+    historyUrl = "";
+  }
+  
+  public JobInfoImpl(
+      JobID id, boolean setupLaunched, boolean setupFinished,
+      boolean cleanupLaunched, int runningMaps, int runningReduces,
+      int waitingMaps, int waitingReduces, int finishedMaps,
+      int finishedReduces, JobStatus status, String historyUrl,
+      List<String> blackListedTracker, boolean isComplete, int numMaps,
+      int numReduces, boolean historyCopied) {
+    super();
+    this.blackListedTracker = blackListedTracker;
+    this.historyUrl = historyUrl;
+    this.id = id;
+    this.setupLaunched = setupLaunched;
+    this.setupFinished = setupFinished;
+    this.cleanupLaunched = cleanupLaunched;
+    this.status = status;
+    this.runningMaps = runningMaps;
+    this.runningReduces = runningReduces;
+    this.waitingMaps = waitingMaps;
+    this.waitingReduces = waitingReduces;
+    this.finishedMaps = finishedMaps;
+    this.finishedReduces = finishedReduces;
+    this.numMaps = numMaps;
+    this.numReduces = numReduces;
+    this.historyCopied = historyCopied;
+  }
+
+  @Override
+  public List<String> getBlackListedTrackers() {
+    return blackListedTracker;
+  }
+
+  @Override
+  public String getHistoryUrl() {
+    return historyUrl;
+  }
+
+  @Override
+  public JobID getID() {
+    return id;
+  }
+
+  @Override
+  public JobStatus getStatus() {
+    return status;
+  }
+
+  @Override
+  public boolean isCleanupLaunched() {
+    return cleanupLaunched;
+  }
+
+  @Override
+  public boolean isSetupLaunched() {
+    return setupLaunched;
+  }
+
+  @Override
+  public boolean isSetupFinished() {
+    return setupFinished;
+  }
+
+  @Override
+  public int runningMaps() {
+    return runningMaps;
+  }
+
+  @Override
+  public int runningReduces() {
+    return runningReduces;
+  }
+
+  @Override
+  public int waitingMaps() {
+    return waitingMaps;
+  }
+
+  @Override
+  public int waitingReduces() {
+    return waitingReduces;
+  }
+ 
+  @Override
+  public int finishedMaps() {
+    return finishedMaps;
+  }
+
+  @Override
+  public int finishedReduces() {
+    return finishedReduces;
+  }
+  
+  @Override
+  public int numMaps() {
+    return numMaps;
+  }
+  
+  @Override
+  public int numReduces() {
+    return numReduces;
+  }
+  
+  @Override
+  public boolean isHistoryFileCopied() {
+    return historyCopied;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id.readFields(in);
+    setupLaunched = in.readBoolean();
+    setupFinished = in.readBoolean();
+    cleanupLaunched = in.readBoolean();
+    status.readFields(in);
+    runningMaps = in.readInt();
+    runningReduces = in.readInt();
+    waitingMaps = in.readInt();
+    waitingReduces = in.readInt();
+    historyUrl = in.readUTF();
+    int size = in.readInt();
+    for (int i = 0; i < size; i++) {
+      blackListedTracker.add(in.readUTF());
+    }
+    finishedMaps = in.readInt();
+    finishedReduces = in.readInt();
+    numMaps = in.readInt();
+    numReduces = in.readInt();
+    historyCopied = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    id.write(out);
+    out.writeBoolean(setupLaunched);
+    out.writeBoolean(setupFinished);
+    out.writeBoolean(cleanupLaunched);
+    status.write(out);
+    out.writeInt(runningMaps);
+    out.writeInt(runningReduces);
+    out.writeInt(waitingMaps);
+    out.writeInt(waitingReduces);
+    out.writeUTF(historyUrl);
+    out.writeInt(blackListedTracker.size());
+    for (String str : blackListedTracker) {
+      out.writeUTF(str);
+    }
+    out.writeInt(finishedMaps);
+    out.writeInt(finishedReduces);
+    out.writeInt(numMaps);
+    out.writeInt(numReduces);
+    out.writeBoolean(historyCopied);
+  }
+
+
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+
+/**
+ * Concrete implementation of the TaskTracker information which is passed to 
+ * the client from JobTracker.
+ * Look at {@link TTInfo}
+ */
+
+class TTInfoImpl implements TTInfo {
+
+  private String taskTrackerName;
+  private TaskTrackerStatus status;
+
+  public TTInfoImpl() {
+    taskTrackerName = "";
+    status = new TaskTrackerStatus();
+  }
+  
+  public TTInfoImpl(String taskTrackerName, TaskTrackerStatus status) {
+    super();
+    this.taskTrackerName = taskTrackerName;
+    this.status = status;
+  }
+
+  @Override
+  public String getName() {
+    return taskTrackerName;
+  }
+
+  @Override
+  public TaskTrackerStatus getStatus() {
+    return status;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskTrackerName = in.readUTF();
+    status.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(taskTrackerName);
+    status.write(out);
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+/**
+ * Abstract class which passes the Task view of the TaskTracker to the client.
+ * See {@link TTInfoImpl} for further details.
+ *
+ */
+abstract class TTTaskInfoImpl implements TTTaskInfo {
+
+  private boolean slotTaken;
+  private boolean wasKilled;
+  TaskStatus status;
+  Configuration conf;
+  String user;
+  boolean isTaskCleanupTask;
+  private String pid;
+
+  public TTTaskInfoImpl() {
+  }
+
+  public TTTaskInfoImpl(boolean slotTaken, boolean wasKilled,
+      TaskStatus status, Configuration conf, String user,
+      boolean isTaskCleanupTask, String pid) {
+    super();
+    this.slotTaken = slotTaken;
+    this.wasKilled = wasKilled;
+    this.status = status;
+    this.conf = conf;
+    this.user = user;
+    this.isTaskCleanupTask = isTaskCleanupTask;
+    this.pid = pid;
+  }
+
+  @Override
+  public boolean slotTaken() {
+    return slotTaken;
+  }
+
+  @Override
+  public boolean wasKilled() {
+    return wasKilled;
+  }
+
+  @Override
+  public abstract TaskStatus getTaskStatus();
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+  @Override
+  public String getUser() {
+    return user;
+  }
+  
+  @Override
+  public boolean isTaskCleanupTask() {
+    return isTaskCleanupTask;
+  }
+  
+  @Override
+  public String getPid() {
+    return pid;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    slotTaken = in.readBoolean();
+    wasKilled = in.readBoolean();
+    conf = new Configuration();
+    conf.readFields(in);
+    user = in.readUTF();
+    isTaskCleanupTask = in.readBoolean();
+    pid = in.readUTF();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(slotTaken);
+    out.writeBoolean(wasKilled);
+    conf.write(out);
+    out.writeUTF(user);
+    out.writeBoolean(isTaskCleanupTask);
+    if (pid != null) {
+      out.writeUTF(pid);
+    } else {
+      out.writeUTF("");
+    }
+    status.write(out);
+  }
+
+  static class MapTTTaskInfo extends TTTaskInfoImpl {
+
+    public MapTTTaskInfo() {
+      super();
+    }
+
+    public MapTTTaskInfo(boolean slotTaken, boolean wasKilled,
+        MapTaskStatus status, Configuration conf, String user,
+        boolean isTaskCleanup,String pid) {
+      super(slotTaken, wasKilled, status, conf, user, isTaskCleanup, pid);
+    }
+
+    @Override
+    public TaskStatus getTaskStatus() {
+      return status;
+    }
+    
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      status = new MapTaskStatus();
+      status.readFields(in);
+    }
+  }
+
+  static class ReduceTTTaskInfo extends TTTaskInfoImpl {
+
+    public ReduceTTTaskInfo() {
+      super();
+    }
+
+    public ReduceTTTaskInfo(boolean slotTaken, boolean wasKilled,
+        ReduceTaskStatus status, Configuration conf, String user,
+        boolean isTaskCleanup, String pid) {
+      super(slotTaken, wasKilled, status, conf, user, isTaskCleanup, pid);
+    }
+
+    @Override
+    public TaskStatus getTaskStatus() {
+      return status;
+    }
+    
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      status = new ReduceTaskStatus();
+      status.readFields(in);
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+
+/**
+ * Concrete class to expose out the task related information to the Clients from
+ * the JobTracker. Look at {@link TaskInfo} for further details.
+ */
+class TaskInfoImpl implements TaskInfo {
+
+  private double progress;
+  private TaskID taskID;
+  private int killedAttempts;
+  private int failedAttempts;
+  private int runningAttempts;
+  private TaskStatus[] taskStatus;
+  private boolean setupOrCleanup;
+  private String[] taskTrackers;
+
+  public TaskInfoImpl() {
+    taskID = new TaskID();
+  }
+
+  public TaskInfoImpl(
+      TaskID taskID, double progress, int runningAttempts, int killedAttempts,
+      int failedAttempts, TaskStatus[] taskStatus, boolean setupOrCleanup,
+      String[] taskTrackers) {
+    this.progress = progress;
+    this.taskID = taskID;
+    this.killedAttempts = killedAttempts;
+    this.failedAttempts = failedAttempts;
+    this.runningAttempts = runningAttempts;
+    if (taskStatus != null) {
+      this.taskStatus = taskStatus;
+    } else {
+      if (taskID.getTaskType() == TaskType.MAP) {
+        this.taskStatus = new MapTaskStatus[] {};
+      } else {
+        this.taskStatus = new ReduceTaskStatus[] {};
+      }
+    }
+    this.setupOrCleanup = setupOrCleanup;
+    this.taskTrackers = taskTrackers;
+  }
+
+  @Override
+  public double getProgress() {
+    return progress;
+  }
+
+  @Override
+  public TaskID getTaskID() {
+    return taskID;
+  }
+
+  @Override
+  public int numKilledAttempts() {
+    return killedAttempts;
+  }
+
+  @Override
+  public int numFailedAttempts() {
+    return failedAttempts;
+  }
+
+  @Override
+  public int numRunningAttempts() {
+    return runningAttempts;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskID.readFields(in);
+    progress = in.readDouble();
+    runningAttempts = in.readInt();
+    killedAttempts = in.readInt();
+    failedAttempts = in.readInt();
+    int size = in.readInt();
+    if (taskID.getTaskType() == TaskType.MAP) {
+      taskStatus = new MapTaskStatus[size];
+    } else {
+      taskStatus = new ReduceTaskStatus[size];
+    }
+    for (int i = 0; i < size; i++) {
+      if (taskID.getTaskType() == TaskType.MAP) {
+        taskStatus[i] = new MapTaskStatus();
+      } else {
+        taskStatus[i] = new ReduceTaskStatus();
+      }
+      taskStatus[i].readFields(in);
+      taskStatus[i].setTaskTracker(in.readUTF());
+    }
+    setupOrCleanup = in.readBoolean();
+    size = in.readInt();
+    taskTrackers = new String[size];
+    for (int i = 0; i < size; i++) {
+      taskTrackers[i] = in.readUTF();
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskID.write(out);
+    out.writeDouble(progress);
+    out.writeInt(runningAttempts);
+    out.writeInt(killedAttempts);
+    out.writeInt(failedAttempts);
+    out.writeInt(taskStatus.length);
+    for (TaskStatus t : taskStatus) {
+      t.write(out);
+      out.writeUTF(t.getTaskTracker());
+    }
+    out.writeBoolean(setupOrCleanup);
+    out.writeInt(taskTrackers.length);
+    for (String tt : taskTrackers) {
+      out.writeUTF(tt);
+    }
+  }
+
+  @Override
+  public TaskStatus[] getTaskStatus() {
+    return taskStatus;
+  }
+
+  @Override
+  public boolean isSetupOrCleanup() {
+    return setupOrCleanup;
+  }
+
+  @Override
+  public String[] getTaskTrackers() {
+    return taskTrackers;
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.test.system.ControlAction;
+
+/**
+ * Control Action which signals a controlled task to proceed to completion. <br/>
+ */
+public class FinishTaskControlAction extends ControlAction<TaskID> {
+
+  private static final String ENABLE_CONTROLLED_TASK_COMPLETION =
+      "test.system.enabled.task.completion.control";
+
+  /**
+   * Create a default control action. <br/>
+   * 
+   */
+  public FinishTaskControlAction() {
+    super(new TaskID());
+  }
+
+  /**
+   * Create a control action specific to a particular task. <br/>
+   * 
+   * @param id
+   *          of the task.
+   */
+  public FinishTaskControlAction(TaskID id) {
+    super(id);
+  }
+
+  /**
+   * Sets up the job to be controlled using the finish task control action. 
+   * <br/>
+   * 
+   * @param conf
+   *          configuration to be used submit the job.
+   */
+  public static void configureControlActionForJob(Configuration conf) {
+    conf.setBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, true);
+  }
+  
+  /**
+   * Checks if the control action is enabled in the passed configuration. <br/>
+   * @param conf configuration
+   * @return true if action is enabled.
+   */
+  public static boolean isControlActionEnabled(Configuration conf) {
+    return conf.getBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, false);
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import static org.junit.Assert.*;
+
+/**
+ * JobTracker client for system tests.
+ */
+public class JTClient extends MRDaemonClient<JTProtocol> {
+  static final Log LOG = LogFactory.getLog(JTClient.class);
+  private JobClient client;
+
+  /**
+   * Create JobTracker client to talk to {@link JobTracker} specified in the
+   * configuration. <br/>
+   * 
+   * @param conf
+   *          configuration used to create a client.
+   * @param daemon
+   *          the process management instance for the {@link JobTracker}
+   * @throws IOException
+   */
+  public JTClient(Configuration conf, RemoteProcess daemon) throws IOException {
+    super(conf, daemon);
+  }
+
+  @Override
+  public synchronized void connect() throws IOException {
+    if (isConnected()) {
+      return;
+    }
+    client = new JobClient(new JobConf(getConf()));
+    setConnected(true);
+  }
+
+  @Override
+  public synchronized void disconnect() throws IOException {
+    client.close();
+  }
+
+  @Override
+  public synchronized JTProtocol getProxy() {
+    return (JTProtocol) client.getProtocol();
+  }
+
+  /**
+   * Gets the {@link JobClient} which can be used for job submission. JobClient
+   * which is returned would not contain the decorated API's. To be used for
+   * submitting of the job.
+   * 
+   * @return client handle to the JobTracker
+   */
+  public JobClient getClient() {
+    return client;
+  }
+
+  /**
+   * Gets the configuration which the JobTracker is currently running.<br/>
+   * 
+   * @return configuration of JobTracker.
+   * 
+   * @throws IOException
+   */
+  public Configuration getJobTrackerConfig() throws IOException {
+    return getProxy().getDaemonConf();
+  }
+
+  /**
+   * Kills the job. <br/>
+   * 
+   * @param id
+   *          of the job to be killed.
+   * @throws IOException
+   */
+  public void killJob(JobID id) throws IOException {
+    try {
+      getClient().killJob(id);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Verification API to check running jobs and running job states. users have
+   * to ensure that their jobs remain running state while verification is
+   * called. <br/>
+   * 
+   * @param jobId
+   *          of the job to be verified.
+   * 
+   * @throws Exception
+   */
+  public void verifyRunningJob(JobID jobId) throws Exception {
+  }
+
+  private JobInfo getJobInfo(JobID jobId) throws IOException {
+    JobInfo info = getProxy().getJobInfo(jobId);
+    if (info == null && !getProxy().isJobRetired(jobId)) {
+      Assert.fail("Job id : " + jobId + " has never been submitted to JT");
+    }
+    return info;
+  }
+
+  /**
+   * Verification API to wait till job retires and verify all the retired state
+   * is correct. <br/>
+   * 
+   * @param job
+   *          of the job used for completion
+   * @return job handle
+   * @throws Exception
+   */
+  public Job submitAndVerifyJob(Job job) throws Exception {
+    job.submit();
+    JobID jobId = job.getJobID();
+    verifyRunningJob(jobId);
+    verifyCompletedJob(jobId);
+    return job;
+  }
+
+  /**
+   * Verification API to check if the job completion state is correct. <br/>
+   * 
+   * @param id
+   *          id of the job to be verified.
+   */
+
+  public void verifyCompletedJob(JobID id) throws Exception {
+    RunningJob rJob =
+        getClient().getJob(org.apache.hadoop.mapred.JobID.downgrade(id));
+    while (!rJob.isComplete()) {
+      LOG.info("waiting for job :" + id + " to retire");
+      Thread.sleep(1000);
+      rJob = getClient().getJob(org.apache.hadoop.mapred.JobID.downgrade(id));
+    }
+    verifyJobDetails(id);
+    JobInfo jobInfo = getJobInfo(id);
+    if (jobInfo != null) {
+      while (!jobInfo.isHistoryFileCopied()) {
+        Thread.sleep(1000);
+        LOG.info(id + " waiting for history file to copied");
+        jobInfo = getJobInfo(id);
+        if (jobInfo == null) {
+          break;
+        }
+      }
+    }
+    verifyJobHistory(id);
+  }
+
+  /**
+   * Verification API to check if the job details are semantically correct.<br/>
+   * 
+   * @param jobId
+   *          jobID of the job
+   * @return true if all the job verifications are verified to be true
+   * @throws Exception
+   */
+  public void verifyJobDetails(JobID jobId) throws Exception {
+    // wait till the setup is launched and finished.
+    JobInfo jobInfo = getJobInfo(jobId);
+    if (jobInfo == null) {
+      return;
+    }
+    LOG.info("waiting for the setup to be finished");
+    while (!jobInfo.isSetupFinished()) {
+      Thread.sleep(2000);
+      jobInfo = getJobInfo(jobId);
+      if (jobInfo == null) {
+        break;
+      }
+    }
+    // verify job id.
+    assertTrue(jobId.toString().startsWith("job_"));
+    LOG.info("verified job id and is : " + jobId.toString());
+    // verify the number of map/reduce tasks.
+    verifyNumTasks(jobId);
+    // should verify job progress.
+    verifyJobProgress(jobId);
+    jobInfo = getJobInfo(jobId);
+    if (jobInfo == null) {
+      return;
+    }
+    if (jobInfo.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+      // verify if map/reduce progress reached 1.
+      jobInfo = getJobInfo(jobId);
+      if (jobInfo == null) {
+        return;
+      }
+      assertEquals(1.0, jobInfo.getStatus().mapProgress(), 0.001);
+      assertEquals(1.0, jobInfo.getStatus().reduceProgress(), 0.001);
+      // verify successful finish of tasks.
+      verifyAllTasksSuccess(jobId);
+    }
+    if (jobInfo.getStatus().isJobComplete()) {
+      // verify if the cleanup is launched.
+      jobInfo = getJobInfo(jobId);
+      if (jobInfo == null) {
+        return;
+      }
+      assertTrue(jobInfo.isCleanupLaunched());
+      LOG.info("Verified launching of cleanup");
+    }
+  }
+
+  public void verifyAllTasksSuccess(JobID jobId) throws IOException {
+    JobInfo jobInfo = getJobInfo(jobId);
+    if (jobInfo == null) {
+      return;
+    }
+
+    TaskInfo[] taskInfos = getProxy().getTaskInfo(jobId);
+
+    if (taskInfos.length == 0 && getProxy().isJobRetired(jobId)) {
+      LOG.info("Job has been retired from JT memory : " + jobId);
+      return;
+    }
+
+    for (TaskInfo taskInfo : taskInfos) {
+      TaskStatus[] taskStatus = taskInfo.getTaskStatus();
+      if (taskStatus != null && taskStatus.length > 0) {
+        int i;
+        for (i = 0; i < taskStatus.length; i++) {
+          if (TaskStatus.State.SUCCEEDED.equals(taskStatus[i].getRunState())) {
+            break;
+          }
+        }
+        assertFalse(i == taskStatus.length);
+      }
+    }
+    LOG.info("verified that none of the tasks failed.");
+  }
+
+  public void verifyJobProgress(JobID jobId) throws IOException {
+    JobInfo jobInfo;
+    jobInfo = getJobInfo(jobId);
+    if (jobInfo == null) {
+      return;
+    }
+    assertTrue(jobInfo.getStatus().mapProgress() >= 0
+        && jobInfo.getStatus().mapProgress() <= 1);
+    LOG.info("verified map progress and is "
+        + jobInfo.getStatus().mapProgress());
+    assertTrue(jobInfo.getStatus().reduceProgress() >= 0
+        && jobInfo.getStatus().reduceProgress() <= 1);
+    LOG.info("verified reduce progress and is "
+        + jobInfo.getStatus().reduceProgress());
+  }
+
+  public void verifyNumTasks(JobID jobId) throws IOException {
+    JobInfo jobInfo;
+    jobInfo = getJobInfo(jobId);
+    if (jobInfo == null) {
+      return;
+    }
+    assertEquals(jobInfo.numMaps(), (jobInfo.runningMaps()
+        + jobInfo.waitingMaps() + jobInfo.finishedMaps()));
+    LOG.info("verified number of map tasks and is " + jobInfo.numMaps());
+
+    assertEquals(jobInfo.numReduces(), (jobInfo.runningReduces()
+        + jobInfo.waitingReduces() + jobInfo.finishedReduces()));
+    LOG.info("verified number of reduce tasks and is " + jobInfo.numReduces());
+  }
+
+  /**
+   * Verification API to check if the job history file is semantically correct. <br/>
+   * 
+   * 
+   * @param jobId
+   *          of the job to be verified.
+   * @throws IOException
+   */
+  public void verifyJobHistory(JobID jobId) throws IOException {
+    JobInfo info = getJobInfo(jobId);
+    String url = "";
+    if (info == null) {
+      LOG.info("Job has been retired from JT memory : " + jobId);
+      url = getProxy().getJobHistoryLocationForRetiredJob(jobId);
+    } else {
+      url = info.getHistoryUrl();
+    }
+    Path p = new Path(url);
+    if (p.toUri().getScheme().equals("file:/")) {
+      FileStatus st = getFileStatus(url, true);
+      Assert.assertNotNull("Job History file for "
+          + jobId + " not present " + "when job is completed", st);
+    } else {
+      FileStatus st = getFileStatus(url, false);
+      Assert.assertNotNull("Job History file for "
+          + jobId + " not present " + "when job is completed", st);
+    }
+    LOG.info("Verified the job history for the jobId : " + jobId);
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+/**
+ * Client side API's exposed from JobTracker.
+ */
+public interface JTProtocol extends DaemonProtocol {
+  long versionID = 1L;
+
+  /**
+   * Get the information pertaining to given job.<br/>
+   * The returned JobInfo object can be null when the
+   * specified job by the job id is retired from the 
+   * JobTracker memory which happens after job is 
+   * completed. <br/>
+   * 
+   * @param id
+   *          of the job for which information is required.
+   * @return information of regarding job null if job is 
+   *         retired from JobTracker memory.
+   * @throws IOException
+   */
+  public JobInfo getJobInfo(JobID jobID) throws IOException;
+
+  /**
+   * Gets the information pertaining to a task. <br/>
+   * The returned TaskInfo object can be null when the 
+   * specified task specified by the task id is retired
+   * from the JobTracker memory which happens after the
+   * job is completed. <br/>
+   * @param id
+   *          of the task for which information is required.
+   * @return information of regarding the task null if the 
+   *          task is retired from JobTracker memory.
+   * @throws IOException
+   */
+  public TaskInfo getTaskInfo(TaskID taskID) throws IOException;
+
+  /**
+   * Gets the information pertaining to a given TaskTracker. <br/>
+   * The returned TTInfo class can be null if the given TaskTracker
+   * information is removed from JobTracker memory which is done
+   * when the TaskTracker is marked lost by the JobTracker. <br/>
+   * @param name
+   *          of the tracker.
+   * @return information regarding the tracker null if the TaskTracker
+   *          is marked lost by the JobTracker.
+   * @throws IOException
+   */
+  public TTInfo getTTInfo(String trackerName) throws IOException;
+
+  /**
+   * Gets a list of all available jobs with JobTracker.<br/>
+   * 
+   * @return list of all jobs.
+   * @throws IOException
+   */
+  public JobInfo[] getAllJobInfo() throws IOException;
+
+  /**
+   * Gets a list of tasks pertaining to a job. <br/>
+   * 
+   * @param id
+   *          of the job.
+   * 
+   * @return list of all tasks for the job.
+   * @throws IOException
+   */
+  public TaskInfo[] getTaskInfo(JobID jobID) throws IOException;
+
+  /**
+   * Gets a list of TaskTrackers which have reported to the JobTracker. <br/>
+   * 
+   * @return list of all TaskTracker.
+   * @throws IOException
+   */
+  public TTInfo[] getAllTTInfo() throws IOException;
+
+  /**
+   * Checks if a given job is retired from the JobTrackers Memory. <br/>
+   * 
+   * @param id
+   *          of the job
+   * @return true if job is retired.
+   * @throws IOException
+   */
+  boolean isJobRetired(JobID jobID) throws IOException;
+
+  /**
+   * Gets the location of the history file for a retired job. <br/>
+   * 
+   * @param id
+   *          of the job
+   * @return location of history file
+   * @throws IOException
+   */
+  String getJobHistoryLocationForRetiredJob(JobID jobID) throws IOException;
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Job state information as seen by the JobTracker.
+ */
+public interface JobInfo extends Writable {
+  /**
+   * Gets the JobId of the job.<br/>
+   * 
+   * @return id of the job.
+   */
+  JobID getID();
+
+  /**
+   * Gets the current status of the job.<br/>
+   * 
+   * @return status.
+   */
+  JobStatus getStatus();
+
+  /**
+   * Gets the history location of the job.<br/>
+   * 
+   * @return the path to the history file.
+   */
+  String getHistoryUrl();
+
+  /**
+   * Gets the number of maps which are currently running for the job. <br/>
+   * 
+   * @return number of running for the job.
+   */
+  int runningMaps();
+
+  /**
+   * Gets the number of reduces currently running for the job. <br/>
+   * 
+   * @return number of reduces running for the job.
+   */
+  int runningReduces();
+
+  /**
+   * Gets the number of maps to be scheduled for the job. <br/>
+   * 
+   * @return number of waiting maps.
+   */
+  int waitingMaps();
+
+  /**
+   * Gets the number of reduces to be scheduled for the job. <br/>
+   * 
+   * @return number of waiting reduces.
+   */
+  int waitingReduces();
+  
+  /**
+   * Gets the number of maps that are finished. <br/>
+   * @return the number of finished maps.
+   */
+  int finishedMaps();
+  
+  /**
+   * Gets the number of map tasks that are to be spawned for the job <br/>
+   * @return
+   */
+  int numMaps();
+  
+  /**
+   * Gets the number of reduce tasks that are to be spawned for the job <br/>
+   * @return
+   */
+  int numReduces();
+  
+  /**
+   * Gets the number of reduces that are finished. <br/>
+   * @return the number of finished reduces.
+   */
+  int finishedReduces();
+
+  /**
+   * Gets if cleanup for the job has been launched.<br/>
+   * 
+   * @return true if cleanup task has been launched.
+   */
+  boolean isCleanupLaunched();
+
+  /**
+   * Gets if the setup for the job has been launched.<br/>
+   * 
+   * @return true if setup task has been launched.
+   */
+  boolean isSetupLaunched();
+
+  /**
+   * Gets if the setup for the job has been completed.<br/>
+   * 
+   * @return true if the setup task for the job has completed.
+   */
+  boolean isSetupFinished();
+
+  /**
+   * Gets list of blacklisted trackers for the particular job. <br/>
+   * 
+   * @return list of blacklisted tracker name.
+   */
+  List<String> getBlackListedTrackers();
+  
+  /**
+   * Gets if the history file of the job is copied to the done 
+   * location <br/>
+   * 
+   * @return true if history file copied.
+   */
+  boolean isHistoryFileCopied();
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.AbstractDaemonCluster;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
+import org.apache.hadoop.test.system.process.MultiUserHadoopDaemonRemoteCluster;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.HadoopDaemonInfo;
+
+/**
+ * Concrete AbstractDaemonCluster representing a Map-Reduce cluster.
+ * 
+ */
+@SuppressWarnings("unchecked")
+public class MRCluster extends AbstractDaemonCluster {
+
+  private static final Log LOG = LogFactory.getLog(MRCluster.class);
+  public static final String CLUSTER_PROCESS_MGR_IMPL = 
+    "test.system.mr.clusterprocess.impl.class";
+
+  /**
+   * Key is used to to point to the file containing hostnames of tasktrackers
+   */
+  public static final String CONF_HADOOP_TT_HOSTFILE_NAME =
+    "test.system.hdrc.tt.hostfile";
+
+  private static List<HadoopDaemonInfo> mrDaemonInfos = 
+    new ArrayList<HadoopDaemonInfo>();
+  private static String TT_hostFileName;
+  private static String jtHostName;
+  private static final String SYSTEM_TEST_FILE = "system-test.xml";
+
+  protected enum Role {JT, TT};
+
+  static{
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
+
+  private MRCluster(Configuration conf, ClusterProcessManager rCluster)
+      throws IOException {
+    super(conf, rCluster);
+  }
+
+  /**
+   * Factory method to create an instance of the Map-Reduce cluster.<br/>
+   * 
+   * @param conf
+   *          contains all required parameter to create cluster.
+   * @return a cluster instance to be managed.
+   * @throws Exception
+   */
+  public static MRCluster createCluster(Configuration conf) 
+      throws Exception {
+    conf.addResource(SYSTEM_TEST_FILE);
+    TT_hostFileName = conf.get(CONF_HADOOP_TT_HOSTFILE_NAME, "slaves");
+    String jtHostPort = conf.get(JTConfig.JT_IPC_ADDRESS);
+    if (jtHostPort == null) {
+      throw new Exception(JTConfig.JT_IPC_ADDRESS + "is not set or "
+        + SYSTEM_TEST_FILE + " hasn't been found.");
+    }
+    jtHostName = jtHostPort.trim().split(":")[0];
+    
+    mrDaemonInfos.add(new HadoopDaemonInfo("jobtracker", 
+        Role.JT, Arrays.asList(new String[]{jtHostName})));
+    mrDaemonInfos.add(new HadoopDaemonInfo("tasktracker", 
+        Role.TT, TT_hostFileName));
+    
+    String implKlass = conf.get(CLUSTER_PROCESS_MGR_IMPL);
+    if (implKlass == null || implKlass.isEmpty()) {
+      implKlass = MRProcessManager.class.getName();
+    }
+    Class<ClusterProcessManager> klass = (Class<ClusterProcessManager>) Class
+      .forName(implKlass);
+    ClusterProcessManager clusterProcessMgr = klass.newInstance();
+    LOG.info("Created ClusterProcessManager as " + implKlass);
+    clusterProcessMgr.init(conf);
+    return new MRCluster(conf, clusterProcessMgr);
+  }
+
+  protected JTClient createJTClient(RemoteProcess jtDaemon)
+      throws IOException {
+    return new JTClient(getConf(), jtDaemon);
+  }
+
+  protected TTClient createTTClient(RemoteProcess ttDaemon) 
+      throws IOException {
+    return new TTClient(getConf(), ttDaemon);
+  }
+
+  public JTClient getJTClient() {
+    Iterator<AbstractDaemonClient> it = getDaemons().get(Role.JT).iterator();
+    return (JTClient) it.next();
+  }
+
+  public List<TTClient> getTTClients() {
+    return (List) getDaemons().get(Role.TT);
+  }
+
+  public TTClient getTTClient(String hostname) {
+    for (TTClient c : getTTClients()) {
+      if (c.getHostName().equals(hostname)) {
+        return c;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void ensureClean() throws IOException {
+    //TODO: ensure that no jobs/tasks are running
+    //restart the cluster if cleanup fails
+    JTClient jtClient = getJTClient();
+    JobInfo[] jobs = jtClient.getProxy().getAllJobInfo();
+    for(JobInfo job : jobs) {
+      jtClient.killJob(
+          org.apache.hadoop.mapred.JobID.downgrade(job.getID()));
+    }
+  }
+
+  @Override
+  protected AbstractDaemonClient createClient(
+      RemoteProcess process) throws IOException {
+    if (Role.JT.equals(process.getRole())) {
+      return createJTClient(process);
+    } else if (Role.TT.equals(process.getRole())) {
+      return createTTClient(process);
+    } else throw new IOException("Role: "+ process.getRole() + "  is not " +
+      "applicable to MRCluster");
+  }
+
+  public static class MRProcessManager extends HadoopDaemonRemoteCluster{
+    public MRProcessManager() {
+      super(mrDaemonInfos);
+    }
+  }
+
+  public static class MultiMRProcessManager
+      extends MultiUserHadoopDaemonRemoteCluster {
+    public MultiMRProcessManager() {
+      super(mrDaemonInfos);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.DaemonProtocol;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Base class for JobTracker and TaskTracker clients.
+ */
+public abstract class MRDaemonClient<PROXY extends DaemonProtocol> 
+    extends AbstractDaemonClient<PROXY>{
+
+  public MRDaemonClient(Configuration conf, RemoteProcess process)
+      throws IOException {
+    super(conf, process);
+  }
+
+  public String[] getMapredLocalDirs() throws IOException {
+    return getProxy().getDaemonConf().getStrings(MRConfig.LOCAL_DIR);
+  }
+
+  public String getLogDir() throws IOException {
+    return getProcessInfo().getSystemProperties().get("hadoop.log.dir");
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * TaskTracker client for system tests. Assumption of the class is that the
+ * configuration key is set for the configuration key : {@code
+ * TTConfig.TT_REPORT_ADDRESS}is set, only the port portion of the
+ * address is used.
+ */
+public class TTClient extends MRDaemonClient<TTProtocol> {
+
+  TTProtocol proxy;
+  private static final String SYSTEM_TEST_FILE = "system-test.xml";
+
+  public TTClient(Configuration conf, RemoteProcess daemon) 
+      throws IOException {
+    super(conf, daemon);
+  }
+
+  @Override
+  public synchronized void connect() throws IOException {
+    if (isConnected()) {
+      return;
+    }
+    String sockAddrStr = getConf().get(TTConfig.TT_REPORT_ADDRESS);
+    if (sockAddrStr == null) {
+      throw new IllegalArgumentException(
+          "TaskTracker report address is not set");
+    }
+    String[] splits = sockAddrStr.split(":");
+    if (splits.length != 2) {
+      throw new IllegalArgumentException(TTConfig.TT_REPORT_ADDRESS
+        + " is not correctly configured or "
+        + SYSTEM_TEST_FILE + " hasn't been found.");
+    }
+    String port = splits[1];
+    String sockAddr = getHostName() + ":" + port;
+    InetSocketAddress bindAddr = NetUtils.createSocketAddr(sockAddr);
+    proxy = (TTProtocol) RPC.getProxy(TTProtocol.class, TTProtocol.versionID,
+        bindAddr, getConf());
+    setConnected(true);
+  }
+
+  @Override
+  public synchronized void disconnect() throws IOException {
+    RPC.stopProxy(proxy);
+  }
+
+  @Override
+  public synchronized TTProtocol getProxy() {
+    return proxy;
+  }
+
+  /**
+   * Gets the last sent status to the {@link JobTracker}. <br/>
+   * 
+   * @return the task tracker status.
+   * @throws IOException
+   */
+  public TaskTrackerStatus getStatus() throws IOException {
+    return getProxy().getStatus();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+
+/**
+ * TaskTracker state information as seen by the JobTracker.
+ */
+public interface TTInfo extends Writable {
+  /**
+   * Gets the {@link TaskTracker} name.<br/>
+   * 
+   * @return name of the tracker.
+   */
+  String getName();
+
+  /**
+   * Gets the current status of the {@link TaskTracker} <br/>
+   * 
+   * @return status of the {@link TaskTracker}
+   */
+  TaskTrackerStatus getStatus();
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+import java.io.IOException;
+
+/**
+ * TaskTracker RPC interface to be used for cluster tests.
+ *
+ * The protocol has to be annotated so KerberosInfo can be filled in during
+ * creation of a ipc.Client connection
+ */
+@KerberosInfo(
+    serverPrincipal = TaskTracker.TT_USER_NAME)
+@TokenInfo(JobTokenSelector.class)
+public interface TTProtocol extends DaemonProtocol {
+
+  public static final long versionID = 1L;
+  /**
+   * Gets latest status which was sent in heartbeat to the {@link JobTracker}. 
+   * <br/>
+   * 
+   * @return status of the TaskTracker daemon
+   * @throws IOException in case of errors
+   */
+  TaskTrackerStatus getStatus() throws IOException;
+
+  /**
+   * Gets list of all the tasks in the {@link TaskTracker}.<br/>
+   * 
+   * @return list of all the tasks
+   * @throws IOException in case of errors
+   */
+  TTTaskInfo[] getTasks() throws IOException;
+
+  /**
+   * Gets the task associated with the id.<br/>
+   * 
+   * @param taskID of the task.
+   * 
+   * @return returns task info <code>TTTaskInfo</code>
+   * @throws IOException in case of errors
+   */
+  TTTaskInfo getTask(TaskID taskID) throws IOException;
+
+  /**
+   * Checks if any of process in the process tree of the task is alive
+   * or not. <br/>
+   * 
+   * @param pid
+   *          of the task attempt
+   * @return true if task process tree is alive.
+   * @throws IOException in case of errors
+   */
+  boolean isProcessTreeAlive(String pid) throws IOException;
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * Task state information as seen by the TT.
+ */
+public interface TTTaskInfo extends Writable {
+
+  /**
+   * Has task occupied a slot? A task occupies a slot once it starts localizing
+   * on the {@link TaskTracker} <br/>
+   * 
+   * @return true if task has started occupying a slot.
+   */
+  boolean slotTaken();
+
+  /**
+   * Has the task been killed? <br/>
+   * 
+   * @return true, if task has been killed.
+   */
+  boolean wasKilled();
+
+  /**
+   * Gets the task status associated with the particular task trackers task 
+   * view.<br/>
+   * 
+   * @return status of the particular task
+   */
+  TaskStatus getTaskStatus();
+  
+  /**
+   * Gets the configuration object of the task.
+   * @return
+   */
+  Configuration getConf();
+  
+  /**
+   * Gets the user of the task.
+   * @return
+   */
+  String getUser();
+  
+  /**
+   * Provides information as to whether the task is a cleanup of task.
+   * @return true if it is a clean up of task.
+   */
+  boolean isTaskCleanupTask();
+
+  /**
+   * Gets the pid of the running task on the task-tracker.
+   * 
+   * @return pid of the task.
+   */
+  String getPid();
+}

Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Task state information of a TaskInProgress as seen by the {@link JobTracker}
+ */
+public interface TaskInfo extends Writable {
+  /**
+   * Gets the task id of the TaskInProgress.
+   * 
+   * @return id of the task.
+   */
+  TaskID getTaskID();
+
+  /**
+   * Number of times task attempts have failed for the given TaskInProgress.
+   * <br/>
+   * 
+   * @return number of failed task attempts.
+   */
+  int numFailedAttempts();
+
+  /**
+   * Number of times task attempts have been killed for the given TaskInProgress 
+   * <br/>
+   * 
+   * @return number of killed task attempts.
+   */
+  int numKilledAttempts();
+
+  /**
+   * Gets the progress of the Task in percentage will be in range of 0.0-1.0 
+   * <br/>
+   * 
+   * @return progress of task in percentage.
+   */
+  double getProgress();
+
+  /**
+   * Number of attempts currently running for the given TaskInProgress.<br/>
+   * 
+   * @return number of running attempts.
+   */
+  int numRunningAttempts();
+
+  /**
+   * Array of TaskStatus objects that are related to the corresponding
+   * TaskInProgress object.The task status of the tip is only populated
+   * once a tracker reports back the task status.<br/>
+   * 
+   * @return list of task statuses.
+   */
+  TaskStatus[] getTaskStatus();
+
+  /**
+   * Gets a list of tracker on which the task attempts are scheduled/running.
+   * Can be empty if the task attempt has succeeded <br/>
+   * 
+   * @return list of trackers
+   */
+  String[] getTaskTrackers();
+
+  /**
+   * Gets if the current TaskInProgress is a setup or cleanup tip. <br/>
+   * 
+   * @return true if setup/cleanup
+   */
+  boolean isSetupOrCleanup();
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+
+import org.junit.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCluster {
+
+  private static final Log LOG = LogFactory.getLog(TestCluster.class);
+
+  private static MRCluster cluster;
+
+  public TestCluster() throws Exception {
+
+  }
+
+  @BeforeClass
+  public static void before() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cluster.tearDown();
+  }
+
+  @Test
+  public void testProcessInfo() throws Exception {
+    LOG.info("Process info of JobTracker is : "
+        + cluster.getJTClient().getProcessInfo());
+    Assert.assertNotNull(cluster.getJTClient().getProcessInfo());
+    Collection<TTClient> tts = cluster.getTTClients();
+    for (TTClient tt : tts) {
+      LOG.info("Process info of TaskTracker is : " + tt.getProcessInfo());
+      Assert.assertNotNull(tt.getProcessInfo());
+    }
+  }
+
+  @Test
+  public void testJobSubmission() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    Job rJob = job.createJob(1, 1, 100, 100, 100, 100);
+    rJob = cluster.getJTClient().submitAndVerifyJob(rJob);
+    cluster.getJTClient().verifyJobHistory(rJob.getJobID());
+  }
+
+  // @Test
+  public void testFileStatus() throws Exception {
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(cluster
+            .getJTClient().getProxy().getDaemonUser());
+    ugi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        MRCluster myCluster = null;
+        try {
+          myCluster = MRCluster.createCluster(cluster.getConf());
+          myCluster.connect();
+          JTClient jt = myCluster.getJTClient();
+          String dir = ".";
+          checkFileStatus(jt.getFileStatus(dir, true));
+          checkFileStatus(jt.listStatus(dir, false, true), dir);
+          for (TTClient tt : myCluster.getTTClients()) {
+            String[] localDirs = tt.getMapredLocalDirs();
+            for (String localDir : localDirs) {
+              checkFileStatus(tt.listStatus(localDir, true, false), localDir);
+              checkFileStatus(tt.listStatus(localDir, true, true), localDir);
+            }
+          }
+          String systemDir = jt.getClient().getSystemDir().toString();
+          checkFileStatus(jt.listStatus(systemDir, false, true), systemDir);
+          checkFileStatus(jt.listStatus(jt.getLogDir(), true, true), jt
+              .getLogDir());
+        } finally {
+          if (myCluster != null) {
+            myCluster.disconnect();
+          }
+        }
+        return null;
+      }
+    });
+  }
+
+  private void checkFileStatus(FileStatus[] fs, String path) {
+    Assert.assertNotNull(fs);
+    LOG.info("-----Listing for " + path + "  " + fs.length);
+    for (FileStatus fz : fs) {
+      checkFileStatus(fz);
+    }
+  }
+
+  private void checkFileStatus(FileStatus fz) {
+    Assert.assertNotNull(fz);
+    LOG.info("FileStatus is "
+        + fz.getPath() + "  " + fz.getPermission() + "  " + fz.getOwner()
+        + "  " + fz.getGroup() + "  " + fz.getClass());
+  }
+
+  /**
+   * Test to verify the common properties of tasks.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testTaskDetails() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+    FinishTaskControlAction.configureControlActionForJob(conf);
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+
+    Job rJob = job.createJob(1, 1, 100, 100, 100, 100);
+    JobClient client = cluster.getJTClient().getClient();
+    rJob.submit();
+    RunningJob rJob1 =
+        client.getJob(org.apache.hadoop.mapred.JobID.downgrade(rJob.getJobID()));
+    JobID id = rJob.getJobID();
+
+    JobInfo jInfo = wovenClient.getJobInfo(id);
+
+    while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+      Thread.sleep(1000);
+      jInfo = wovenClient.getJobInfo(id);
+    }
+
+    LOG.info("Waiting till job starts running one map");
+
+    TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(id);
+    boolean isOneTaskStored = false;
+    String sometaskpid = null;
+    org.apache.hadoop.mapreduce.TaskAttemptID sometaskId = null;
+    TTClient myCli = null;
+    for (TaskInfo info : myTaskInfos) {
+      if (!info.isSetupOrCleanup()) {
+        String[] taskTrackers = info.getTaskTrackers();
+        for (String taskTracker : taskTrackers) {
+          TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
+          TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost());
+          TaskID taskId = info.getTaskID();
+          TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(taskId);
+          Assert.assertNotNull(ttTaskInfo);
+          Assert.assertNotNull(ttTaskInfo.getConf());
+          Assert.assertNotNull(ttTaskInfo.getUser());
+          Assert.assertTrue(ttTaskInfo.getTaskStatus().getProgress() >= 0.0);
+          Assert.assertTrue(ttTaskInfo.getTaskStatus().getProgress() <= 1.0);
+          // Get the pid of the task attempt. The task need not have
+          // reported the pid of the task by the time we are checking
+          // the pid. So perform null check.
+          String pid = ttTaskInfo.getPid();
+          int i = 1;
+          while (pid.isEmpty()) {
+            Thread.sleep(1000);
+            LOG.info("Waiting for task to report its pid back");
+            ttTaskInfo = ttCli.getProxy().getTask(taskId);
+            pid = ttTaskInfo.getPid();
+            if (i == 40) {
+              Assert.fail("The task pid not reported for 40 seconds.");
+            }
+            i++;
+          }
+          if (!isOneTaskStored) {
+            sometaskpid = pid;
+            sometaskId = ttTaskInfo.getTaskStatus().getTaskID();
+            myCli = ttCli;
+            isOneTaskStored = true;
+          }
+          LOG.info("verified task progress to be between 0 and 1");
+          State state = ttTaskInfo.getTaskStatus().getRunState();
+          if (ttTaskInfo.getTaskStatus().getProgress() < 1.0
+              && ttTaskInfo.getTaskStatus().getProgress() > 0.0) {
+            Assert.assertEquals(TaskStatus.State.RUNNING, state);
+            LOG.info("verified run state as " + state);
+          }
+          FinishTaskControlAction action =
+              new FinishTaskControlAction(org.apache.hadoop.mapred.TaskID
+                  .downgrade(info.getTaskID()));
+          ttCli.getProxy().sendAction(action);
+        }
+      }
+    }
+    rJob.killJob();
+    int i = 1;
+    while (!rJob.isComplete()) {
+      Thread.sleep(1000);
+      if (i == 40) {
+        Assert
+            .fail("The job not completed within 40 seconds after killing it.");
+      }
+      i++;
+    }
+    TTTaskInfo myTaskInfo = myCli.getProxy().getTask(sometaskId.getTaskID());
+    i = 0;
+    while (myTaskInfo != null && !myTaskInfo.getPid().isEmpty()) {
+      LOG.info("sleeping till task is retired from TT memory");
+      Thread.sleep(1000);
+      myTaskInfo = myCli.getProxy().getTask(sometaskId.getTaskID());
+      if (i == 40) {
+        Assert
+            .fail("Task not retired from TT memory within 40 seconds of job completeing");
+      }
+      i++;
+    }
+    Assert.assertFalse(myCli.getProxy().isProcessTreeAlive(sometaskpid));
+  }
+
+  @Test
+  public void testClusterRestart() throws Exception {
+    cluster.stop();
+    // Give the cluster time to stop the whole cluster.
+    AbstractDaemonClient cli = cluster.getJTClient();
+    int i = 1;
+    while (i < 40) {
+      try {
+        cli.ping();
+        Thread.sleep(1000);
+        i++;
+      } catch (Exception e) {
+        break;
+      }
+    }
+    if (i >= 40) {
+      Assert.fail("JT on " + cli.getHostName() + " Should have been down.");
+    }
+    i = 1;
+    for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+      i = 1;
+      while (i < 40) {
+        try {
+          tcli.ping();
+          Thread.sleep(1000);
+          i++;
+        } catch (Exception e) {
+          break;
+        }
+      }
+      if (i >= 40) {
+        Assert.fail("TT on " + tcli.getHostName() + " Should have been down.");
+      }
+    }
+    cluster.start();
+    cli = cluster.getJTClient();
+    i = 1;
+    while (i < 40) {
+      try {
+        cli.ping();
+        break;
+      } catch (Exception e) {
+        i++;
+        Thread.sleep(1000);
+        LOG.info("Waiting for Jobtracker on host : "
+            + cli.getHostName() + " to come up.");
+      }
+    }
+    if (i >= 40) {
+      Assert.fail("JT on " + cli.getHostName() + " Should have been up.");
+    }
+    for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+      i = 1;
+      while (i < 40) {
+        try {
+          tcli.ping();
+          break;
+        } catch (Exception e) {
+          i++;
+          Thread.sleep(1000);
+          LOG.info("Waiting for Tasktracker on host : "
+              + tcli.getHostName() + " to come up.");
+        }
+      }
+      if (i >= 40) {
+        Assert.fail("TT on " + tcli.getHostName() + " Should have been Up.");
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestControlledJob {
+  private MRCluster cluster;
+
+  private static final Log LOG = LogFactory.getLog(TestControlledJob.class);
+
+  public TestControlledJob() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+  }
+
+  @Before
+  public void before() throws Exception {
+    cluster.setUp();
+  }
+
+  @After
+  public void after() throws Exception {
+    cluster.tearDown();
+  }
+
+  @Test
+  public void testControlledJob() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+    FinishTaskControlAction.configureControlActionForJob(conf);
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+
+    Job slpJob = job.createJob(1, 0, 100, 100, 100, 100);
+    slpJob.submit();
+    JobClient client = cluster.getJTClient().getClient();
+
+    RunningJob rJob =
+        client.getJob(org.apache.hadoop.mapred.JobID.downgrade(slpJob
+            .getJobID()));
+    JobID id = rJob.getID();
+
+    JobInfo jInfo = wovenClient.getJobInfo(id);
+
+    while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+      Thread.sleep(1000);
+      jInfo = wovenClient.getJobInfo(id);
+    }
+
+    LOG.info("Waiting till job starts running one map");
+    jInfo = wovenClient.getJobInfo(id);
+    Assert.assertEquals(jInfo.runningMaps(), 1);
+
+    LOG.info("waiting for another cycle to "
+        + "check if the maps dont finish off");
+    Thread.sleep(1000);
+    jInfo = wovenClient.getJobInfo(id);
+    Assert.assertEquals(jInfo.runningMaps(), 1);
+
+    TaskInfo[] taskInfos = wovenClient.getTaskInfo(id);
+
+    for (TaskInfo info : taskInfos) {
+      LOG.info("constructing control action to signal task to finish");
+      FinishTaskControlAction action =
+          new FinishTaskControlAction(TaskID.downgrade(info.getTaskID()));
+      for (TTClient cli : cluster.getTTClients()) {
+        cli.getProxy().sendAction(action);
+      }
+    }
+
+    jInfo = wovenClient.getJobInfo(id);
+    int i = 1;
+    if (jInfo != null) {
+      while (!jInfo.getStatus().isJobComplete()) {
+        Thread.sleep(1000);
+        jInfo = wovenClient.getJobInfo(id);
+        if (jInfo == null) {
+          break;
+        }
+        if (i > 40) {
+          Assert.fail("Controlled Job with ID : "
+              + jInfo.getID()
+              + " has not completed in 40 seconds after signalling.");
+        }
+        i++;
+      }
+    }
+    LOG.info("Job sucessfully completed after signalling!!!!");
+  }
+}