You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2010/06/21 21:02:51 UTC
svn commit: r956666 [2/4] - in /hadoop/mapreduce/trunk: ./ ivy/
src/test/aop/build/ src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/testjar/ src/test/system/ src/test/system/aop/
src/test/system/aop/org/ src/test/system/aop/org/apache/ src/te...
Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj (added)
+++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj Mon Jun 21 19:02:49 2010
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+
+public privileged aspect ClusterAspect {
+
+ public ClientProtocol Cluster.getClientProtocol() {
+ return client;
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml (added)
+++ hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml Mon Jun 21 19:02:49 2010
@@ -0,0 +1,101 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+<!-- Mandatory properties that are to be set and uncommented before running the tests -->
+
+<property>
+ <name>test.system.hdrc.hadoophome</name>
+ <value>$(TO_DO_HADOOP_INSTALL)/share/hadoop-current</value>
+ <description> This is the path to the home directory of the hadoop deployment.
+ </description>
+</property>
+<property>
+ <name>test.system.hdrc.hadoopconfdir</name>
+ <value>$(TO_DO_HADOOP_INSTALL)/conf/hadoop</value>
+ <description> This is the path to the configuration directory of the hadoop
+ cluster that is deployed.
+ </description>
+</property>
+
+<property>
+ <name>test.system.hdrc.tt.hostfile</name>
+ <value>slaves.localcopy.txt</value>
+ <description> File name containing the hostnames where the TaskTrackers are running.
+ </description>
+</property>
+
+<property>
+ <name>test.system.mr.clusterprocess.impl.class</name>
+ <value>org.apache.hadoop.mapreduce.test.system.MRCluster$MRProcessManager</value>
+ <description>
+ Cluster process manager for the Mapreduce subsystem of the cluster. The value
+ org.apache.hadoop.mapreduce.test.system.MRCluster$MultiMRProcessManager can
+ be used to enable multi user support.
+ </description>
+</property>
+
+<property>
+ <name>test.system.hdrc.deployed.scripts.dir</name>
+ <value>./src/test/system/scripts</value>
+ <description>
+ This directory hosts the scripts in the deployed location where
+ the system test client runs.
+ </description>
+</property>
+
+<property>
+ <name>test.system.hdrc.hadoopnewconfdir</name>
+ <value>$(TO_DO_GLOBAL_TMP_DIR)/newconf</value>
+ <description>
+ The directory where the new config files will be copied to in all
+ the clusters is pointed out this directory.
+ </description>
+</property>
+
+<!-- Mandatory keys to be set for the multi user support to be enabled. -->
+
+ <property>
+ <name>test.system.mr.clusterprocess.impl.class</name>
+ <value>org.apache.hadoop.mapreduce.test.system.MRCluster$MultiMRProcessManager</value>
+ <description>
+ Enabling multi user based cluster process manger.
+ </description>
+</property>
+
+<property>
+ <name>test.system.hdrc.multi-user.binary.path</name>
+ <value>$(TO_DO_HADOOP_INSTALL)/conf/hadoop/runAs</value>
+ <description>
+ Local file system path on gate way to cluster-controller binary including the binary name.
+ To build the binary the following commands need to be executed:
+ % ant run-as -Drun-as.hadoop.home.dir=(HADOOP_HOME of setup cluster)
+ % cp build-fi/system/c++-build/runAs test.system.hdrc.multi-user.binary.path
+ Location of the cluster is important security precaution.
+ The binary should be owned by root and test user group permission should be set such a
+ way that it can be executed by binary. Example usage would be:
+ % sudo chown root binary
+ % sudo chmod 6511 binary
+ Change permission appropriately to make it more secure.
+ </description>
+</property>
+
+<property>
+ <name>test.system.hdrc.multi-user.managinguser.jobtracker</name>
+ <value>*</value>
+ <description>
+ User value for managing the particular daemon, please note that these user should be
+ present on gateways also, an example configuration for the above would be
+ key name = test.system.hdrc.multi-user.managinguser.jobtracker
+ key value = guest
+ Please note the daemon names are all lower case, corresponding to hadoop-daemon.sh command.
+ </description>
+</property>
+<property>
+ <name>test.system.hdrc.multi-user.managinguser.tasktracker</name>
+ <value>*</value>
+</property>
+
+</configuration>
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+
+/**
+ * Concrete implementation of the JobInfo interface which is exposed to the
+ * clients.
+ * Look at {@link JobInfo} for further details.
+ */
+class JobInfoImpl implements JobInfo {
+
+ private List<String> blackListedTracker;
+ private String historyUrl;
+ private JobID id;
+ private boolean setupLaunched;
+ private boolean setupFinished;
+ private boolean cleanupLaunched;
+ private JobStatus status;
+ private int runningMaps;
+ private int runningReduces;
+ private int waitingMaps;
+ private int waitingReduces;
+ private int finishedMaps;
+ private int finishedReduces;
+ private int numMaps;
+ private int numReduces;
+ private boolean historyCopied;
+
+ public JobInfoImpl() {
+ id = new JobID();
+ status = new JobStatus();
+ blackListedTracker = new LinkedList<String>();
+ historyUrl = "";
+ }
+
+ public JobInfoImpl(
+ JobID id, boolean setupLaunched, boolean setupFinished,
+ boolean cleanupLaunched, int runningMaps, int runningReduces,
+ int waitingMaps, int waitingReduces, int finishedMaps,
+ int finishedReduces, JobStatus status, String historyUrl,
+ List<String> blackListedTracker, boolean isComplete, int numMaps,
+ int numReduces, boolean historyCopied) {
+ super();
+ this.blackListedTracker = blackListedTracker;
+ this.historyUrl = historyUrl;
+ this.id = id;
+ this.setupLaunched = setupLaunched;
+ this.setupFinished = setupFinished;
+ this.cleanupLaunched = cleanupLaunched;
+ this.status = status;
+ this.runningMaps = runningMaps;
+ this.runningReduces = runningReduces;
+ this.waitingMaps = waitingMaps;
+ this.waitingReduces = waitingReduces;
+ this.finishedMaps = finishedMaps;
+ this.finishedReduces = finishedReduces;
+ this.numMaps = numMaps;
+ this.numReduces = numReduces;
+ this.historyCopied = historyCopied;
+ }
+
+ @Override
+ public List<String> getBlackListedTrackers() {
+ return blackListedTracker;
+ }
+
+ @Override
+ public String getHistoryUrl() {
+ return historyUrl;
+ }
+
+ @Override
+ public JobID getID() {
+ return id;
+ }
+
+ @Override
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public boolean isCleanupLaunched() {
+ return cleanupLaunched;
+ }
+
+ @Override
+ public boolean isSetupLaunched() {
+ return setupLaunched;
+ }
+
+ @Override
+ public boolean isSetupFinished() {
+ return setupFinished;
+ }
+
+ @Override
+ public int runningMaps() {
+ return runningMaps;
+ }
+
+ @Override
+ public int runningReduces() {
+ return runningReduces;
+ }
+
+ @Override
+ public int waitingMaps() {
+ return waitingMaps;
+ }
+
+ @Override
+ public int waitingReduces() {
+ return waitingReduces;
+ }
+
+ @Override
+ public int finishedMaps() {
+ return finishedMaps;
+ }
+
+ @Override
+ public int finishedReduces() {
+ return finishedReduces;
+ }
+
+ @Override
+ public int numMaps() {
+ return numMaps;
+ }
+
+ @Override
+ public int numReduces() {
+ return numReduces;
+ }
+
+ @Override
+ public boolean isHistoryFileCopied() {
+ return historyCopied;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ id.readFields(in);
+ setupLaunched = in.readBoolean();
+ setupFinished = in.readBoolean();
+ cleanupLaunched = in.readBoolean();
+ status.readFields(in);
+ runningMaps = in.readInt();
+ runningReduces = in.readInt();
+ waitingMaps = in.readInt();
+ waitingReduces = in.readInt();
+ historyUrl = in.readUTF();
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ blackListedTracker.add(in.readUTF());
+ }
+ finishedMaps = in.readInt();
+ finishedReduces = in.readInt();
+ numMaps = in.readInt();
+ numReduces = in.readInt();
+ historyCopied = in.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ id.write(out);
+ out.writeBoolean(setupLaunched);
+ out.writeBoolean(setupFinished);
+ out.writeBoolean(cleanupLaunched);
+ status.write(out);
+ out.writeInt(runningMaps);
+ out.writeInt(runningReduces);
+ out.writeInt(waitingMaps);
+ out.writeInt(waitingReduces);
+ out.writeUTF(historyUrl);
+ out.writeInt(blackListedTracker.size());
+ for (String str : blackListedTracker) {
+ out.writeUTF(str);
+ }
+ out.writeInt(finishedMaps);
+ out.writeInt(finishedReduces);
+ out.writeInt(numMaps);
+ out.writeInt(numReduces);
+ out.writeBoolean(historyCopied);
+ }
+
+
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+
+/**
+ * Concrete implementation of the TaskTracker information which is passed to
+ * the client from JobTracker.
+ * Look at {@link TTInfo}
+ */
+
+class TTInfoImpl implements TTInfo {
+
+ private String taskTrackerName;
+ private TaskTrackerStatus status;
+
+ public TTInfoImpl() {
+ taskTrackerName = "";
+ status = new TaskTrackerStatus();
+ }
+
+ public TTInfoImpl(String taskTrackerName, TaskTrackerStatus status) {
+ super();
+ this.taskTrackerName = taskTrackerName;
+ this.status = status;
+ }
+
+ @Override
+ public String getName() {
+ return taskTrackerName;
+ }
+
+ @Override
+ public TaskTrackerStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskTrackerName = in.readUTF();
+ status.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(taskTrackerName);
+ status.write(out);
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+/**
+ * Abstract class which passes the Task view of the TaskTracker to the client.
+ * See {@link TTInfoImpl} for further details.
+ *
+ */
+abstract class TTTaskInfoImpl implements TTTaskInfo {
+
+ private boolean slotTaken;
+ private boolean wasKilled;
+ TaskStatus status;
+ Configuration conf;
+ String user;
+ boolean isTaskCleanupTask;
+ private String pid;
+
+ public TTTaskInfoImpl() {
+ }
+
+ public TTTaskInfoImpl(boolean slotTaken, boolean wasKilled,
+ TaskStatus status, Configuration conf, String user,
+ boolean isTaskCleanupTask, String pid) {
+ super();
+ this.slotTaken = slotTaken;
+ this.wasKilled = wasKilled;
+ this.status = status;
+ this.conf = conf;
+ this.user = user;
+ this.isTaskCleanupTask = isTaskCleanupTask;
+ this.pid = pid;
+ }
+
+ @Override
+ public boolean slotTaken() {
+ return slotTaken;
+ }
+
+ @Override
+ public boolean wasKilled() {
+ return wasKilled;
+ }
+
+ @Override
+ public abstract TaskStatus getTaskStatus();
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public boolean isTaskCleanupTask() {
+ return isTaskCleanupTask;
+ }
+
+ @Override
+ public String getPid() {
+ return pid;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ slotTaken = in.readBoolean();
+ wasKilled = in.readBoolean();
+ conf = new Configuration();
+ conf.readFields(in);
+ user = in.readUTF();
+ isTaskCleanupTask = in.readBoolean();
+ pid = in.readUTF();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(slotTaken);
+ out.writeBoolean(wasKilled);
+ conf.write(out);
+ out.writeUTF(user);
+ out.writeBoolean(isTaskCleanupTask);
+ if (pid != null) {
+ out.writeUTF(pid);
+ } else {
+ out.writeUTF("");
+ }
+ status.write(out);
+ }
+
+ static class MapTTTaskInfo extends TTTaskInfoImpl {
+
+ public MapTTTaskInfo() {
+ super();
+ }
+
+ public MapTTTaskInfo(boolean slotTaken, boolean wasKilled,
+ MapTaskStatus status, Configuration conf, String user,
+ boolean isTaskCleanup,String pid) {
+ super(slotTaken, wasKilled, status, conf, user, isTaskCleanup, pid);
+ }
+
+ @Override
+ public TaskStatus getTaskStatus() {
+ return status;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ status = new MapTaskStatus();
+ status.readFields(in);
+ }
+ }
+
+ static class ReduceTTTaskInfo extends TTTaskInfoImpl {
+
+ public ReduceTTTaskInfo() {
+ super();
+ }
+
+ public ReduceTTTaskInfo(boolean slotTaken, boolean wasKilled,
+ ReduceTaskStatus status, Configuration conf, String user,
+ boolean isTaskCleanup, String pid) {
+ super(slotTaken, wasKilled, status, conf, user, isTaskCleanup, pid);
+ }
+
+ @Override
+ public TaskStatus getTaskStatus() {
+ return status;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ status = new ReduceTaskStatus();
+ status.readFields(in);
+ }
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+
+/**
+ * Concrete class to expose out the task related information to the Clients from
+ * the JobTracker. Look at {@link TaskInfo} for further details.
+ */
+class TaskInfoImpl implements TaskInfo {
+
+ private double progress;
+ private TaskID taskID;
+ private int killedAttempts;
+ private int failedAttempts;
+ private int runningAttempts;
+ private TaskStatus[] taskStatus;
+ private boolean setupOrCleanup;
+ private String[] taskTrackers;
+
+ public TaskInfoImpl() {
+ taskID = new TaskID();
+ }
+
+ public TaskInfoImpl(
+ TaskID taskID, double progress, int runningAttempts, int killedAttempts,
+ int failedAttempts, TaskStatus[] taskStatus, boolean setupOrCleanup,
+ String[] taskTrackers) {
+ this.progress = progress;
+ this.taskID = taskID;
+ this.killedAttempts = killedAttempts;
+ this.failedAttempts = failedAttempts;
+ this.runningAttempts = runningAttempts;
+ if (taskStatus != null) {
+ this.taskStatus = taskStatus;
+ } else {
+ if (taskID.getTaskType() == TaskType.MAP) {
+ this.taskStatus = new MapTaskStatus[] {};
+ } else {
+ this.taskStatus = new ReduceTaskStatus[] {};
+ }
+ }
+ this.setupOrCleanup = setupOrCleanup;
+ this.taskTrackers = taskTrackers;
+ }
+
+ @Override
+ public double getProgress() {
+ return progress;
+ }
+
+ @Override
+ public TaskID getTaskID() {
+ return taskID;
+ }
+
+ @Override
+ public int numKilledAttempts() {
+ return killedAttempts;
+ }
+
+ @Override
+ public int numFailedAttempts() {
+ return failedAttempts;
+ }
+
+ @Override
+ public int numRunningAttempts() {
+ return runningAttempts;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskID.readFields(in);
+ progress = in.readDouble();
+ runningAttempts = in.readInt();
+ killedAttempts = in.readInt();
+ failedAttempts = in.readInt();
+ int size = in.readInt();
+ if (taskID.getTaskType() == TaskType.MAP) {
+ taskStatus = new MapTaskStatus[size];
+ } else {
+ taskStatus = new ReduceTaskStatus[size];
+ }
+ for (int i = 0; i < size; i++) {
+ if (taskID.getTaskType() == TaskType.MAP) {
+ taskStatus[i] = new MapTaskStatus();
+ } else {
+ taskStatus[i] = new ReduceTaskStatus();
+ }
+ taskStatus[i].readFields(in);
+ taskStatus[i].setTaskTracker(in.readUTF());
+ }
+ setupOrCleanup = in.readBoolean();
+ size = in.readInt();
+ taskTrackers = new String[size];
+ for (int i = 0; i < size; i++) {
+ taskTrackers[i] = in.readUTF();
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskID.write(out);
+ out.writeDouble(progress);
+ out.writeInt(runningAttempts);
+ out.writeInt(killedAttempts);
+ out.writeInt(failedAttempts);
+ out.writeInt(taskStatus.length);
+ for (TaskStatus t : taskStatus) {
+ t.write(out);
+ out.writeUTF(t.getTaskTracker());
+ }
+ out.writeBoolean(setupOrCleanup);
+ out.writeInt(taskTrackers.length);
+ for (String tt : taskTrackers) {
+ out.writeUTF(tt);
+ }
+ }
+
+ @Override
+ public TaskStatus[] getTaskStatus() {
+ return taskStatus;
+ }
+
+ @Override
+ public boolean isSetupOrCleanup() {
+ return setupOrCleanup;
+ }
+
+ @Override
+ public String[] getTaskTrackers() {
+ return taskTrackers;
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.test.system.ControlAction;
+
+/**
+ * Control Action which signals a controlled task to proceed to completion. <br/>
+ */
+public class FinishTaskControlAction extends ControlAction<TaskID> {
+
+ private static final String ENABLE_CONTROLLED_TASK_COMPLETION =
+ "test.system.enabled.task.completion.control";
+
+ /**
+ * Create a default control action. <br/>
+ *
+ */
+ public FinishTaskControlAction() {
+ super(new TaskID());
+ }
+
+ /**
+ * Create a control action specific to a particular task. <br/>
+ *
+ * @param id
+ * of the task.
+ */
+ public FinishTaskControlAction(TaskID id) {
+ super(id);
+ }
+
+ /**
+ * Sets up the job to be controlled using the finish task control action.
+ * <br/>
+ *
+ * @param conf
+ * configuration to be used submit the job.
+ */
+ public static void configureControlActionForJob(Configuration conf) {
+ conf.setBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, true);
+ }
+
+ /**
+ * Checks if the control action is enabled in the passed configuration. <br/>
+ * @param conf configuration
+ * @return true if action is enabled.
+ */
+ public static boolean isControlActionEnabled(Configuration conf) {
+ return conf.getBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, false);
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import static org.junit.Assert.*;
+
+/**
+ * JobTracker client for system tests.
+ */
+public class JTClient extends MRDaemonClient<JTProtocol> {
+ static final Log LOG = LogFactory.getLog(JTClient.class);
+ private JobClient client;
+
+ /**
+ * Create JobTracker client to talk to {@link JobTracker} specified in the
+ * configuration. <br/>
+ *
+ * @param conf
+ * configuration used to create a client.
+ * @param daemon
+ * the process management instance for the {@link JobTracker}
+ * @throws IOException
+ */
+ public JTClient(Configuration conf, RemoteProcess daemon) throws IOException {
+ super(conf, daemon);
+ }
+
+ @Override
+ public synchronized void connect() throws IOException {
+ if (isConnected()) {
+ return;
+ }
+ client = new JobClient(new JobConf(getConf()));
+ setConnected(true);
+ }
+
+ @Override
+ public synchronized void disconnect() throws IOException {
+ client.close();
+ }
+
+ @Override
+ public synchronized JTProtocol getProxy() {
+ return (JTProtocol) client.getProtocol();
+ }
+
+ /**
+ * Gets the {@link JobClient} which can be used for job submission. JobClient
+ * which is returned would not contain the decorated API's. To be used for
+ * submitting of the job.
+ *
+ * @return client handle to the JobTracker
+ */
+ public JobClient getClient() {
+ return client;
+ }
+
+ /**
+ * Gets the configuration which the JobTracker is currently running.<br/>
+ *
+ * @return configuration of JobTracker.
+ *
+ * @throws IOException
+ */
+ public Configuration getJobTrackerConfig() throws IOException {
+ return getProxy().getDaemonConf();
+ }
+
+ /**
+ * Kills the job. <br/>
+ *
+ * @param id
+ * of the job to be killed.
+ * @throws IOException
+ */
+ public void killJob(JobID id) throws IOException {
+ try {
+ getClient().killJob(id);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Verification API to check running jobs and running job states. users have
+ * to ensure that their jobs remain running state while verification is
+ * called. <br/>
+ *
+ * @param jobId
+ * of the job to be verified.
+ *
+ * @throws Exception
+ */
+ public void verifyRunningJob(JobID jobId) throws Exception {
+ }
+
+ private JobInfo getJobInfo(JobID jobId) throws IOException {
+ JobInfo info = getProxy().getJobInfo(jobId);
+ if (info == null && !getProxy().isJobRetired(jobId)) {
+ Assert.fail("Job id : " + jobId + " has never been submitted to JT");
+ }
+ return info;
+ }
+
+ /**
+ * Verification API to wait till job retires and verify all the retired state
+ * is correct. <br/>
+ *
+ * @param job
+ * of the job used for completion
+ * @return job handle
+ * @throws Exception
+ */
+ public Job submitAndVerifyJob(Job job) throws Exception {
+ job.submit();
+ JobID jobId = job.getJobID();
+ verifyRunningJob(jobId);
+ verifyCompletedJob(jobId);
+ return job;
+ }
+
+ /**
+ * Verification API to check if the job completion state is correct. <br/>
+ *
+ * @param id
+ * id of the job to be verified.
+ */
+
+ public void verifyCompletedJob(JobID id) throws Exception {
+ RunningJob rJob =
+ getClient().getJob(org.apache.hadoop.mapred.JobID.downgrade(id));
+ while (!rJob.isComplete()) {
+ LOG.info("waiting for job :" + id + " to retire");
+ Thread.sleep(1000);
+ rJob = getClient().getJob(org.apache.hadoop.mapred.JobID.downgrade(id));
+ }
+ verifyJobDetails(id);
+ JobInfo jobInfo = getJobInfo(id);
+ if (jobInfo != null) {
+ while (!jobInfo.isHistoryFileCopied()) {
+ Thread.sleep(1000);
+ LOG.info(id + " waiting for history file to copied");
+ jobInfo = getJobInfo(id);
+ if (jobInfo == null) {
+ break;
+ }
+ }
+ }
+ verifyJobHistory(id);
+ }
+
+ /**
+ * Verification API to check if the job details are semantically correct.<br/>
+ *
+ * @param jobId
+ * jobID of the job
+ * @return true if all the job verifications are verified to be true
+ * @throws Exception
+ */
+ public void verifyJobDetails(JobID jobId) throws Exception {
+ // wait till the setup is launched and finished.
+ JobInfo jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ return;
+ }
+ LOG.info("waiting for the setup to be finished");
+ while (!jobInfo.isSetupFinished()) {
+ Thread.sleep(2000);
+ jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ break;
+ }
+ }
+ // verify job id.
+ assertTrue(jobId.toString().startsWith("job_"));
+ LOG.info("verified job id and is : " + jobId.toString());
+ // verify the number of map/reduce tasks.
+ verifyNumTasks(jobId);
+ // should verify job progress.
+ verifyJobProgress(jobId);
+ jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ return;
+ }
+ if (jobInfo.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+ // verify if map/reduce progress reached 1.
+ jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ return;
+ }
+ assertEquals(1.0, jobInfo.getStatus().mapProgress(), 0.001);
+ assertEquals(1.0, jobInfo.getStatus().reduceProgress(), 0.001);
+ // verify successful finish of tasks.
+ verifyAllTasksSuccess(jobId);
+ }
+ if (jobInfo.getStatus().isJobComplete()) {
+ // verify if the cleanup is launched.
+ jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ return;
+ }
+ assertTrue(jobInfo.isCleanupLaunched());
+ LOG.info("Verified launching of cleanup");
+ }
+ }
+
+ public void verifyAllTasksSuccess(JobID jobId) throws IOException {
+ JobInfo jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ return;
+ }
+
+ TaskInfo[] taskInfos = getProxy().getTaskInfo(jobId);
+
+ if (taskInfos.length == 0 && getProxy().isJobRetired(jobId)) {
+ LOG.info("Job has been retired from JT memory : " + jobId);
+ return;
+ }
+
+ for (TaskInfo taskInfo : taskInfos) {
+ TaskStatus[] taskStatus = taskInfo.getTaskStatus();
+ if (taskStatus != null && taskStatus.length > 0) {
+ int i;
+ for (i = 0; i < taskStatus.length; i++) {
+ if (TaskStatus.State.SUCCEEDED.equals(taskStatus[i].getRunState())) {
+ break;
+ }
+ }
+ assertFalse(i == taskStatus.length);
+ }
+ }
+ LOG.info("verified that none of the tasks failed.");
+ }
+
+ public void verifyJobProgress(JobID jobId) throws IOException {
+ JobInfo jobInfo;
+ jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ return;
+ }
+ assertTrue(jobInfo.getStatus().mapProgress() >= 0
+ && jobInfo.getStatus().mapProgress() <= 1);
+ LOG.info("verified map progress and is "
+ + jobInfo.getStatus().mapProgress());
+ assertTrue(jobInfo.getStatus().reduceProgress() >= 0
+ && jobInfo.getStatus().reduceProgress() <= 1);
+ LOG.info("verified reduce progress and is "
+ + jobInfo.getStatus().reduceProgress());
+ }
+
+ public void verifyNumTasks(JobID jobId) throws IOException {
+ JobInfo jobInfo;
+ jobInfo = getJobInfo(jobId);
+ if (jobInfo == null) {
+ return;
+ }
+ assertEquals(jobInfo.numMaps(), (jobInfo.runningMaps()
+ + jobInfo.waitingMaps() + jobInfo.finishedMaps()));
+ LOG.info("verified number of map tasks and is " + jobInfo.numMaps());
+
+ assertEquals(jobInfo.numReduces(), (jobInfo.runningReduces()
+ + jobInfo.waitingReduces() + jobInfo.finishedReduces()));
+ LOG.info("verified number of reduce tasks and is " + jobInfo.numReduces());
+ }
+
+ /**
+ * Verification API to check if the job history file is semantically correct. <br/>
+ *
+ *
+ * @param jobId
+ * of the job to be verified.
+ * @throws IOException
+ */
+ public void verifyJobHistory(JobID jobId) throws IOException {
+ JobInfo info = getJobInfo(jobId);
+ String url = "";
+ if (info == null) {
+ LOG.info("Job has been retired from JT memory : " + jobId);
+ url = getProxy().getJobHistoryLocationForRetiredJob(jobId);
+ } else {
+ url = info.getHistoryUrl();
+ }
+ Path p = new Path(url);
+ if (p.toUri().getScheme().equals("file:/")) {
+ FileStatus st = getFileStatus(url, true);
+ Assert.assertNotNull("Job History file for "
+ + jobId + " not present " + "when job is completed", st);
+ } else {
+ FileStatus st = getFileStatus(url, false);
+ Assert.assertNotNull("Job History file for "
+ + jobId + " not present " + "when job is completed", st);
+ }
+ LOG.info("Verified the job history for the jobId : " + jobId);
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+/**
+ * Client side API's exposed from JobTracker.
+ */
+public interface JTProtocol extends DaemonProtocol {
+ long versionID = 1L;
+
+ /**
+ * Get the information pertaining to given job.<br/>
+ * The returned JobInfo object can be null when the
+ * specified job by the job id is retired from the
+ * JobTracker memory which happens after job is
+ * completed. <br/>
+ *
+ * @param id
+ * of the job for which information is required.
+ * @return information of regarding job null if job is
+ * retired from JobTracker memory.
+ * @throws IOException
+ */
+ public JobInfo getJobInfo(JobID jobID) throws IOException;
+
+ /**
+ * Gets the information pertaining to a task. <br/>
+ * The returned TaskInfo object can be null when the
+ * specified task specified by the task id is retired
+ * from the JobTracker memory which happens after the
+ * job is completed. <br/>
+ * @param id
+ * of the task for which information is required.
+ * @return information of regarding the task null if the
+ * task is retired from JobTracker memory.
+ * @throws IOException
+ */
+ public TaskInfo getTaskInfo(TaskID taskID) throws IOException;
+
+ /**
+ * Gets the information pertaining to a given TaskTracker. <br/>
+ * The returned TTInfo class can be null if the given TaskTracker
+ * information is removed from JobTracker memory which is done
+ * when the TaskTracker is marked lost by the JobTracker. <br/>
+ * @param name
+ * of the tracker.
+ * @return information regarding the tracker null if the TaskTracker
+ * is marked lost by the JobTracker.
+ * @throws IOException
+ */
+ public TTInfo getTTInfo(String trackerName) throws IOException;
+
+ /**
+ * Gets a list of all available jobs with JobTracker.<br/>
+ *
+ * @return list of all jobs.
+ * @throws IOException
+ */
+ public JobInfo[] getAllJobInfo() throws IOException;
+
+ /**
+ * Gets a list of tasks pertaining to a job. <br/>
+ *
+ * @param id
+ * of the job.
+ *
+ * @return list of all tasks for the job.
+ * @throws IOException
+ */
+ public TaskInfo[] getTaskInfo(JobID jobID) throws IOException;
+
+ /**
+ * Gets a list of TaskTrackers which have reported to the JobTracker. <br/>
+ *
+ * @return list of all TaskTracker.
+ * @throws IOException
+ */
+ public TTInfo[] getAllTTInfo() throws IOException;
+
+ /**
+ * Checks if a given job is retired from the JobTrackers Memory. <br/>
+ *
+ * @param id
+ * of the job
+ * @return true if job is retired.
+ * @throws IOException
+ */
+ boolean isJobRetired(JobID jobID) throws IOException;
+
+ /**
+ * Gets the location of the history file for a retired job. <br/>
+ *
+ * @param id
+ * of the job
+ * @return location of history file
+ * @throws IOException
+ */
+ String getJobHistoryLocationForRetiredJob(JobID jobID) throws IOException;
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Job state information as seen by the JobTracker.
+ */
+public interface JobInfo extends Writable {
+ /**
+ * Gets the JobId of the job.<br/>
+ *
+ * @return id of the job.
+ */
+ JobID getID();
+
+ /**
+ * Gets the current status of the job.<br/>
+ *
+ * @return status.
+ */
+ JobStatus getStatus();
+
+ /**
+ * Gets the history location of the job.<br/>
+ *
+ * @return the path to the history file.
+ */
+ String getHistoryUrl();
+
+ /**
+ * Gets the number of maps which are currently running for the job. <br/>
+ *
+ * @return number of running for the job.
+ */
+ int runningMaps();
+
+ /**
+ * Gets the number of reduces currently running for the job. <br/>
+ *
+ * @return number of reduces running for the job.
+ */
+ int runningReduces();
+
+ /**
+ * Gets the number of maps to be scheduled for the job. <br/>
+ *
+ * @return number of waiting maps.
+ */
+ int waitingMaps();
+
+ /**
+ * Gets the number of reduces to be scheduled for the job. <br/>
+ *
+ * @return number of waiting reduces.
+ */
+ int waitingReduces();
+
+ /**
+ * Gets the number of maps that are finished. <br/>
+ * @return the number of finished maps.
+ */
+ int finishedMaps();
+
+ /**
+ * Gets the number of map tasks that are to be spawned for the job <br/>
+ * @return
+ */
+ int numMaps();
+
+ /**
+ * Gets the number of reduce tasks that are to be spawned for the job <br/>
+ * @return
+ */
+ int numReduces();
+
+ /**
+ * Gets the number of reduces that are finished. <br/>
+ * @return the number of finished reduces.
+ */
+ int finishedReduces();
+
+ /**
+ * Gets if cleanup for the job has been launched.<br/>
+ *
+ * @return true if cleanup task has been launched.
+ */
+ boolean isCleanupLaunched();
+
+ /**
+ * Gets if the setup for the job has been launched.<br/>
+ *
+ * @return true if setup task has been launched.
+ */
+ boolean isSetupLaunched();
+
+ /**
+ * Gets if the setup for the job has been completed.<br/>
+ *
+ * @return true if the setup task for the job has completed.
+ */
+ boolean isSetupFinished();
+
+ /**
+ * Gets list of blacklisted trackers for the particular job. <br/>
+ *
+ * @return list of blacklisted tracker name.
+ */
+ List<String> getBlackListedTrackers();
+
+ /**
+ * Gets if the history file of the job is copied to the done
+ * location <br/>
+ *
+ * @return true if history file copied.
+ */
+ boolean isHistoryFileCopied();
+}
\ No newline at end of file
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.AbstractDaemonCluster;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
+import org.apache.hadoop.test.system.process.MultiUserHadoopDaemonRemoteCluster;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.HadoopDaemonInfo;
+
+/**
+ * Concrete AbstractDaemonCluster representing a Map-Reduce cluster.
+ *
+ */
+@SuppressWarnings("unchecked")
+public class MRCluster extends AbstractDaemonCluster {
+
+ private static final Log LOG = LogFactory.getLog(MRCluster.class);
+ public static final String CLUSTER_PROCESS_MGR_IMPL =
+ "test.system.mr.clusterprocess.impl.class";
+
+ /**
+ * Key is used to to point to the file containing hostnames of tasktrackers
+ */
+ public static final String CONF_HADOOP_TT_HOSTFILE_NAME =
+ "test.system.hdrc.tt.hostfile";
+
+ private static List<HadoopDaemonInfo> mrDaemonInfos =
+ new ArrayList<HadoopDaemonInfo>();
+ private static String TT_hostFileName;
+ private static String jtHostName;
+ private static final String SYSTEM_TEST_FILE = "system-test.xml";
+
+ protected enum Role {JT, TT};
+
+ static{
+ Configuration.addDefaultResource("mapred-default.xml");
+ Configuration.addDefaultResource("mapred-site.xml");
+ }
+
+ private MRCluster(Configuration conf, ClusterProcessManager rCluster)
+ throws IOException {
+ super(conf, rCluster);
+ }
+
+ /**
+ * Factory method to create an instance of the Map-Reduce cluster.<br/>
+ *
+ * @param conf
+ * contains all required parameter to create cluster.
+ * @return a cluster instance to be managed.
+ * @throws Exception
+ */
+ public static MRCluster createCluster(Configuration conf)
+ throws Exception {
+ conf.addResource(SYSTEM_TEST_FILE);
+ TT_hostFileName = conf.get(CONF_HADOOP_TT_HOSTFILE_NAME, "slaves");
+ String jtHostPort = conf.get(JTConfig.JT_IPC_ADDRESS);
+ if (jtHostPort == null) {
+ throw new Exception(JTConfig.JT_IPC_ADDRESS + "is not set or "
+ + SYSTEM_TEST_FILE + " hasn't been found.");
+ }
+ jtHostName = jtHostPort.trim().split(":")[0];
+
+ mrDaemonInfos.add(new HadoopDaemonInfo("jobtracker",
+ Role.JT, Arrays.asList(new String[]{jtHostName})));
+ mrDaemonInfos.add(new HadoopDaemonInfo("tasktracker",
+ Role.TT, TT_hostFileName));
+
+ String implKlass = conf.get(CLUSTER_PROCESS_MGR_IMPL);
+ if (implKlass == null || implKlass.isEmpty()) {
+ implKlass = MRProcessManager.class.getName();
+ }
+ Class<ClusterProcessManager> klass = (Class<ClusterProcessManager>) Class
+ .forName(implKlass);
+ ClusterProcessManager clusterProcessMgr = klass.newInstance();
+ LOG.info("Created ClusterProcessManager as " + implKlass);
+ clusterProcessMgr.init(conf);
+ return new MRCluster(conf, clusterProcessMgr);
+ }
+
+ protected JTClient createJTClient(RemoteProcess jtDaemon)
+ throws IOException {
+ return new JTClient(getConf(), jtDaemon);
+ }
+
+ protected TTClient createTTClient(RemoteProcess ttDaemon)
+ throws IOException {
+ return new TTClient(getConf(), ttDaemon);
+ }
+
+ public JTClient getJTClient() {
+ Iterator<AbstractDaemonClient> it = getDaemons().get(Role.JT).iterator();
+ return (JTClient) it.next();
+ }
+
+ public List<TTClient> getTTClients() {
+ return (List) getDaemons().get(Role.TT);
+ }
+
+ public TTClient getTTClient(String hostname) {
+ for (TTClient c : getTTClients()) {
+ if (c.getHostName().equals(hostname)) {
+ return c;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void ensureClean() throws IOException {
+ //TODO: ensure that no jobs/tasks are running
+ //restart the cluster if cleanup fails
+ JTClient jtClient = getJTClient();
+ JobInfo[] jobs = jtClient.getProxy().getAllJobInfo();
+ for(JobInfo job : jobs) {
+ jtClient.killJob(
+ org.apache.hadoop.mapred.JobID.downgrade(job.getID()));
+ }
+ }
+
+ @Override
+ protected AbstractDaemonClient createClient(
+ RemoteProcess process) throws IOException {
+ if (Role.JT.equals(process.getRole())) {
+ return createJTClient(process);
+ } else if (Role.TT.equals(process.getRole())) {
+ return createTTClient(process);
+ } else throw new IOException("Role: "+ process.getRole() + " is not " +
+ "applicable to MRCluster");
+ }
+
+ public static class MRProcessManager extends HadoopDaemonRemoteCluster{
+ public MRProcessManager() {
+ super(mrDaemonInfos);
+ }
+ }
+
+ public static class MultiMRProcessManager
+ extends MultiUserHadoopDaemonRemoteCluster {
+ public MultiMRProcessManager() {
+ super(mrDaemonInfos);
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.DaemonProtocol;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Base class for JobTracker and TaskTracker clients.
+ */
+public abstract class MRDaemonClient<PROXY extends DaemonProtocol>
+ extends AbstractDaemonClient<PROXY>{
+
+ public MRDaemonClient(Configuration conf, RemoteProcess process)
+ throws IOException {
+ super(conf, process);
+ }
+
+ public String[] getMapredLocalDirs() throws IOException {
+ return getProxy().getDaemonConf().getStrings(MRConfig.LOCAL_DIR);
+ }
+
+ public String getLogDir() throws IOException {
+ return getProcessInfo().getSystemProperties().get("hadoop.log.dir");
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * TaskTracker client for system tests. Assumption of the class is that the
+ * configuration key is set for the configuration key : {@code
+ * TTConfig.TT_REPORT_ADDRESS}is set, only the port portion of the
+ * address is used.
+ */
+public class TTClient extends MRDaemonClient<TTProtocol> {
+
+ TTProtocol proxy;
+ private static final String SYSTEM_TEST_FILE = "system-test.xml";
+
+ public TTClient(Configuration conf, RemoteProcess daemon)
+ throws IOException {
+ super(conf, daemon);
+ }
+
+ @Override
+ public synchronized void connect() throws IOException {
+ if (isConnected()) {
+ return;
+ }
+ String sockAddrStr = getConf().get(TTConfig.TT_REPORT_ADDRESS);
+ if (sockAddrStr == null) {
+ throw new IllegalArgumentException(
+ "TaskTracker report address is not set");
+ }
+ String[] splits = sockAddrStr.split(":");
+ if (splits.length != 2) {
+ throw new IllegalArgumentException(TTConfig.TT_REPORT_ADDRESS
+ + " is not correctly configured or "
+ + SYSTEM_TEST_FILE + " hasn't been found.");
+ }
+ String port = splits[1];
+ String sockAddr = getHostName() + ":" + port;
+ InetSocketAddress bindAddr = NetUtils.createSocketAddr(sockAddr);
+ proxy = (TTProtocol) RPC.getProxy(TTProtocol.class, TTProtocol.versionID,
+ bindAddr, getConf());
+ setConnected(true);
+ }
+
+ @Override
+ public synchronized void disconnect() throws IOException {
+ RPC.stopProxy(proxy);
+ }
+
+ @Override
+ public synchronized TTProtocol getProxy() {
+ return proxy;
+ }
+
+ /**
+ * Gets the last sent status to the {@link JobTracker}. <br/>
+ *
+ * @return the task tracker status.
+ * @throws IOException
+ */
+ public TaskTrackerStatus getStatus() throws IOException {
+ return getProxy().getStatus();
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+
+/**
+ * TaskTracker state information as seen by the JobTracker.
+ */
+public interface TTInfo extends Writable {
+ /**
+ * Gets the {@link TaskTracker} name.<br/>
+ *
+ * @return name of the tracker.
+ */
+ String getName();
+
+ /**
+ * Gets the current status of the {@link TaskTracker} <br/>
+ *
+ * @return status of the {@link TaskTracker}
+ */
+ TaskTrackerStatus getStatus();
+}
\ No newline at end of file
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+import java.io.IOException;
+
+/**
+ * TaskTracker RPC interface to be used for cluster tests.
+ *
+ * The protocol has to be annotated so KerberosInfo can be filled in during
+ * creation of a ipc.Client connection
+ */
+@KerberosInfo(
+ serverPrincipal = TaskTracker.TT_USER_NAME)
+@TokenInfo(JobTokenSelector.class)
+public interface TTProtocol extends DaemonProtocol {
+
+ public static final long versionID = 1L;
+ /**
+ * Gets latest status which was sent in heartbeat to the {@link JobTracker}.
+ * <br/>
+ *
+ * @return status of the TaskTracker daemon
+ * @throws IOException in case of errors
+ */
+ TaskTrackerStatus getStatus() throws IOException;
+
+ /**
+ * Gets list of all the tasks in the {@link TaskTracker}.<br/>
+ *
+ * @return list of all the tasks
+ * @throws IOException in case of errors
+ */
+ TTTaskInfo[] getTasks() throws IOException;
+
+ /**
+ * Gets the task associated with the id.<br/>
+ *
+ * @param taskID of the task.
+ *
+ * @return returns task info <code>TTTaskInfo</code>
+ * @throws IOException in case of errors
+ */
+ TTTaskInfo getTask(TaskID taskID) throws IOException;
+
+ /**
+ * Checks if any of process in the process tree of the task is alive
+ * or not. <br/>
+ *
+ * @param pid
+ * of the task attempt
+ * @return true if task process tree is alive.
+ * @throws IOException in case of errors
+ */
+ boolean isProcessTreeAlive(String pid) throws IOException;
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * Task state information as seen by the TT.
+ */
+public interface TTTaskInfo extends Writable {
+
+ /**
+ * Has task occupied a slot? A task occupies a slot once it starts localizing
+ * on the {@link TaskTracker} <br/>
+ *
+ * @return true if task has started occupying a slot.
+ */
+ boolean slotTaken();
+
+ /**
+ * Has the task been killed? <br/>
+ *
+ * @return true, if task has been killed.
+ */
+ boolean wasKilled();
+
+ /**
+ * Gets the task status associated with the particular task trackers task
+ * view.<br/>
+ *
+ * @return status of the particular task
+ */
+ TaskStatus getTaskStatus();
+
+ /**
+ * Gets the configuration object of the task.
+ * @return
+ */
+ Configuration getConf();
+
+ /**
+ * Gets the user of the task.
+ * @return
+ */
+ String getUser();
+
+ /**
+ * Provides information as to whether the task is a cleanup of task.
+ * @return true if it is a clean up of task.
+ */
+ boolean isTaskCleanupTask();
+
+ /**
+ * Gets the pid of the running task on the task-tracker.
+ *
+ * @return pid of the task.
+ */
+ String getPid();
+}
Added: hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Task state information of a TaskInProgress as seen by the {@link JobTracker}
+ */
+public interface TaskInfo extends Writable {
+ /**
+ * Gets the task id of the TaskInProgress.
+ *
+ * @return id of the task.
+ */
+ TaskID getTaskID();
+
+ /**
+ * Number of times task attempts have failed for the given TaskInProgress.
+ * <br/>
+ *
+ * @return number of failed task attempts.
+ */
+ int numFailedAttempts();
+
+ /**
+ * Number of times task attempts have been killed for the given TaskInProgress
+ * <br/>
+ *
+ * @return number of killed task attempts.
+ */
+ int numKilledAttempts();
+
+ /**
+ * Gets the progress of the Task in percentage will be in range of 0.0-1.0
+ * <br/>
+ *
+ * @return progress of task in percentage.
+ */
+ double getProgress();
+
+ /**
+ * Number of attempts currently running for the given TaskInProgress.<br/>
+ *
+ * @return number of running attempts.
+ */
+ int numRunningAttempts();
+
+ /**
+ * Array of TaskStatus objects that are related to the corresponding
+ * TaskInProgress object.The task status of the tip is only populated
+ * once a tracker reports back the task status.<br/>
+ *
+ * @return list of task statuses.
+ */
+ TaskStatus[] getTaskStatus();
+
+ /**
+ * Gets a list of tracker on which the task attempts are scheduled/running.
+ * Can be empty if the task attempt has succeeded <br/>
+ *
+ * @return list of trackers
+ */
+ String[] getTaskTrackers();
+
+ /**
+ * Gets if the current TaskInProgress is a setup or cleanup tip. <br/>
+ *
+ * @return true if setup/cleanup
+ */
+ boolean isSetupOrCleanup();
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+
+import org.junit.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCluster {
+
+ private static final Log LOG = LogFactory.getLog(TestCluster.class);
+
+ private static MRCluster cluster;
+
+ public TestCluster() throws Exception {
+
+ }
+
+ @BeforeClass
+ public static void before() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ cluster.tearDown();
+ }
+
+ @Test
+ public void testProcessInfo() throws Exception {
+ LOG.info("Process info of JobTracker is : "
+ + cluster.getJTClient().getProcessInfo());
+ Assert.assertNotNull(cluster.getJTClient().getProcessInfo());
+ Collection<TTClient> tts = cluster.getTTClients();
+ for (TTClient tt : tts) {
+ LOG.info("Process info of TaskTracker is : " + tt.getProcessInfo());
+ Assert.assertNotNull(tt.getProcessInfo());
+ }
+ }
+
+ @Test
+ public void testJobSubmission() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ Job rJob = job.createJob(1, 1, 100, 100, 100, 100);
+ rJob = cluster.getJTClient().submitAndVerifyJob(rJob);
+ cluster.getJTClient().verifyJobHistory(rJob.getJobID());
+ }
+
+ // @Test
+ public void testFileStatus() throws Exception {
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(cluster
+ .getJTClient().getProxy().getDaemonUser());
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ MRCluster myCluster = null;
+ try {
+ myCluster = MRCluster.createCluster(cluster.getConf());
+ myCluster.connect();
+ JTClient jt = myCluster.getJTClient();
+ String dir = ".";
+ checkFileStatus(jt.getFileStatus(dir, true));
+ checkFileStatus(jt.listStatus(dir, false, true), dir);
+ for (TTClient tt : myCluster.getTTClients()) {
+ String[] localDirs = tt.getMapredLocalDirs();
+ for (String localDir : localDirs) {
+ checkFileStatus(tt.listStatus(localDir, true, false), localDir);
+ checkFileStatus(tt.listStatus(localDir, true, true), localDir);
+ }
+ }
+ String systemDir = jt.getClient().getSystemDir().toString();
+ checkFileStatus(jt.listStatus(systemDir, false, true), systemDir);
+ checkFileStatus(jt.listStatus(jt.getLogDir(), true, true), jt
+ .getLogDir());
+ } finally {
+ if (myCluster != null) {
+ myCluster.disconnect();
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ private void checkFileStatus(FileStatus[] fs, String path) {
+ Assert.assertNotNull(fs);
+ LOG.info("-----Listing for " + path + " " + fs.length);
+ for (FileStatus fz : fs) {
+ checkFileStatus(fz);
+ }
+ }
+
+ private void checkFileStatus(FileStatus fz) {
+ Assert.assertNotNull(fz);
+ LOG.info("FileStatus is "
+ + fz.getPath() + " " + fz.getPermission() + " " + fz.getOwner()
+ + " " + fz.getGroup() + " " + fz.getClass());
+ }
+
+ /**
+ * Test to verify the common properties of tasks.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTaskDetails() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ FinishTaskControlAction.configureControlActionForJob(conf);
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+
+ Job rJob = job.createJob(1, 1, 100, 100, 100, 100);
+ JobClient client = cluster.getJTClient().getClient();
+ rJob.submit();
+ RunningJob rJob1 =
+ client.getJob(org.apache.hadoop.mapred.JobID.downgrade(rJob.getJobID()));
+ JobID id = rJob.getJobID();
+
+ JobInfo jInfo = wovenClient.getJobInfo(id);
+
+ while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+ Thread.sleep(1000);
+ jInfo = wovenClient.getJobInfo(id);
+ }
+
+ LOG.info("Waiting till job starts running one map");
+
+ TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(id);
+ boolean isOneTaskStored = false;
+ String sometaskpid = null;
+ org.apache.hadoop.mapreduce.TaskAttemptID sometaskId = null;
+ TTClient myCli = null;
+ for (TaskInfo info : myTaskInfos) {
+ if (!info.isSetupOrCleanup()) {
+ String[] taskTrackers = info.getTaskTrackers();
+ for (String taskTracker : taskTrackers) {
+ TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
+ TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost());
+ TaskID taskId = info.getTaskID();
+ TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(taskId);
+ Assert.assertNotNull(ttTaskInfo);
+ Assert.assertNotNull(ttTaskInfo.getConf());
+ Assert.assertNotNull(ttTaskInfo.getUser());
+ Assert.assertTrue(ttTaskInfo.getTaskStatus().getProgress() >= 0.0);
+ Assert.assertTrue(ttTaskInfo.getTaskStatus().getProgress() <= 1.0);
+ // Get the pid of the task attempt. The task need not have
+ // reported the pid of the task by the time we are checking
+ // the pid. So perform null check.
+ String pid = ttTaskInfo.getPid();
+ int i = 1;
+ while (pid.isEmpty()) {
+ Thread.sleep(1000);
+ LOG.info("Waiting for task to report its pid back");
+ ttTaskInfo = ttCli.getProxy().getTask(taskId);
+ pid = ttTaskInfo.getPid();
+ if (i == 40) {
+ Assert.fail("The task pid not reported for 40 seconds.");
+ }
+ i++;
+ }
+ if (!isOneTaskStored) {
+ sometaskpid = pid;
+ sometaskId = ttTaskInfo.getTaskStatus().getTaskID();
+ myCli = ttCli;
+ isOneTaskStored = true;
+ }
+ LOG.info("verified task progress to be between 0 and 1");
+ State state = ttTaskInfo.getTaskStatus().getRunState();
+ if (ttTaskInfo.getTaskStatus().getProgress() < 1.0
+ && ttTaskInfo.getTaskStatus().getProgress() > 0.0) {
+ Assert.assertEquals(TaskStatus.State.RUNNING, state);
+ LOG.info("verified run state as " + state);
+ }
+ FinishTaskControlAction action =
+ new FinishTaskControlAction(org.apache.hadoop.mapred.TaskID
+ .downgrade(info.getTaskID()));
+ ttCli.getProxy().sendAction(action);
+ }
+ }
+ }
+ rJob.killJob();
+ int i = 1;
+ while (!rJob.isComplete()) {
+ Thread.sleep(1000);
+ if (i == 40) {
+ Assert
+ .fail("The job not completed within 40 seconds after killing it.");
+ }
+ i++;
+ }
+ TTTaskInfo myTaskInfo = myCli.getProxy().getTask(sometaskId.getTaskID());
+ i = 0;
+ while (myTaskInfo != null && !myTaskInfo.getPid().isEmpty()) {
+ LOG.info("sleeping till task is retired from TT memory");
+ Thread.sleep(1000);
+ myTaskInfo = myCli.getProxy().getTask(sometaskId.getTaskID());
+ if (i == 40) {
+ Assert
+ .fail("Task not retired from TT memory within 40 seconds of job completeing");
+ }
+ i++;
+ }
+ Assert.assertFalse(myCli.getProxy().isProcessTreeAlive(sometaskpid));
+ }
+
+ @Test
+ public void testClusterRestart() throws Exception {
+ cluster.stop();
+ // Give the cluster time to stop the whole cluster.
+ AbstractDaemonClient cli = cluster.getJTClient();
+ int i = 1;
+ while (i < 40) {
+ try {
+ cli.ping();
+ Thread.sleep(1000);
+ i++;
+ } catch (Exception e) {
+ break;
+ }
+ }
+ if (i >= 40) {
+ Assert.fail("JT on " + cli.getHostName() + " Should have been down.");
+ }
+ i = 1;
+ for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+ i = 1;
+ while (i < 40) {
+ try {
+ tcli.ping();
+ Thread.sleep(1000);
+ i++;
+ } catch (Exception e) {
+ break;
+ }
+ }
+ if (i >= 40) {
+ Assert.fail("TT on " + tcli.getHostName() + " Should have been down.");
+ }
+ }
+ cluster.start();
+ cli = cluster.getJTClient();
+ i = 1;
+ while (i < 40) {
+ try {
+ cli.ping();
+ break;
+ } catch (Exception e) {
+ i++;
+ Thread.sleep(1000);
+ LOG.info("Waiting for Jobtracker on host : "
+ + cli.getHostName() + " to come up.");
+ }
+ }
+ if (i >= 40) {
+ Assert.fail("JT on " + cli.getHostName() + " Should have been up.");
+ }
+ for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+ i = 1;
+ while (i < 40) {
+ try {
+ tcli.ping();
+ break;
+ } catch (Exception e) {
+ i++;
+ Thread.sleep(1000);
+ LOG.info("Waiting for Tasktracker on host : "
+ + tcli.getHostName() + " to come up.");
+ }
+ }
+ if (i >= 40) {
+ Assert.fail("TT on " + tcli.getHostName() + " Should have been Up.");
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestControlledJob {
+ private MRCluster cluster;
+
+ private static final Log LOG = LogFactory.getLog(TestControlledJob.class);
+
+ public TestControlledJob() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ }
+
+ @Before
+ public void before() throws Exception {
+ cluster.setUp();
+ }
+
+ @After
+ public void after() throws Exception {
+ cluster.tearDown();
+ }
+
+ @Test
+ public void testControlledJob() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+ FinishTaskControlAction.configureControlActionForJob(conf);
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+
+ Job slpJob = job.createJob(1, 0, 100, 100, 100, 100);
+ slpJob.submit();
+ JobClient client = cluster.getJTClient().getClient();
+
+ RunningJob rJob =
+ client.getJob(org.apache.hadoop.mapred.JobID.downgrade(slpJob
+ .getJobID()));
+ JobID id = rJob.getID();
+
+ JobInfo jInfo = wovenClient.getJobInfo(id);
+
+ while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+ Thread.sleep(1000);
+ jInfo = wovenClient.getJobInfo(id);
+ }
+
+ LOG.info("Waiting till job starts running one map");
+ jInfo = wovenClient.getJobInfo(id);
+ Assert.assertEquals(jInfo.runningMaps(), 1);
+
+ LOG.info("waiting for another cycle to "
+ + "check if the maps dont finish off");
+ Thread.sleep(1000);
+ jInfo = wovenClient.getJobInfo(id);
+ Assert.assertEquals(jInfo.runningMaps(), 1);
+
+ TaskInfo[] taskInfos = wovenClient.getTaskInfo(id);
+
+ for (TaskInfo info : taskInfos) {
+ LOG.info("constructing control action to signal task to finish");
+ FinishTaskControlAction action =
+ new FinishTaskControlAction(TaskID.downgrade(info.getTaskID()));
+ for (TTClient cli : cluster.getTTClients()) {
+ cli.getProxy().sendAction(action);
+ }
+ }
+
+ jInfo = wovenClient.getJobInfo(id);
+ int i = 1;
+ if (jInfo != null) {
+ while (!jInfo.getStatus().isJobComplete()) {
+ Thread.sleep(1000);
+ jInfo = wovenClient.getJobInfo(id);
+ if (jInfo == null) {
+ break;
+ }
+ if (i > 40) {
+ Assert.fail("Controlled Job with ID : "
+ + jInfo.getID()
+ + " has not completed in 40 seconds after signalling.");
+ }
+ i++;
+ }
+ }
+ LOG.info("Job sucessfully completed after signalling!!!!");
+ }
+}