You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:31:32 UTC
svn commit: r1077582 - in
/hadoop/common/branches/branch-0.20-security-patches/src/test/system:
aop/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapred/
java/org/apache/hadoop/mapreduce/test/system/ java/shared/ java/shared/org/
java/shared/org/apa...
Author: omalley
Date: Fri Mar 4 04:31:31 2011
New Revision: 1077582
URL: http://svn.apache.org/viewvc?rev=1077582&view=rev
Log:
commit 8d4bb100904e189de9c215c52012303e9a18a1b1
Author: Iyappan Srinivasan <iy...@yahoo-inc.com>
Date: Fri Jul 23 06:27:13 2010 +0000
MAPREDUCE-1871 https://issues.apache.org/jira/secure/attachment/12450270/1871-ydist-security-patch.txt
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj
hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj?rev=1077582&r1=1077581&r2=1077582&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj Fri Mar 4 04:31:31 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.test.
import org.apache.hadoop.mapreduce.test.system.JobInfo;
import org.apache.hadoop.mapreduce.test.system.TTInfo;
import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapred.StatisticsCollectionHandler;
/**
* Aspect which injects the basic protocol functionality which is to be
@@ -91,5 +92,25 @@ public aspect JTProtocolAspect {
public String JTProtocol.getJobSummaryInfo(JobID jobId) throws IOException {
return null;
}
+
+ public int JTProtocol.getTaskTrackerLevelStatistics(TaskTrackerStatus
+ ttStatus, String timePeriod, String totalTasksOrSucceededTasks)
+ throws IOException {
+ return 0;
+ }
+
+ public int JTProtocol.getInfoFromAllClients(String timePeriod,
+ String totalTasksOrSucceededTasks) throws IOException {
+ return 0;
+ }
+ public StatisticsCollectionHandler JTProtocol.
+ getInfoFromAllClientsForAllTaskType() throws Exception {
+ return null;
+ }
+
+ public int JTProtocol.getTaskTrackerHeartbeatInterval()
+ throws Exception {
+ return -1;
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj?rev=1077582&r1=1077581&r2=1077582&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj Fri Mar 4 04:31:31 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -31,6 +32,9 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapred.StatisticsCollector;
+import org.apache.hadoop.mapred.StatisticsCollectionHandler;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.test.system.JTProtocol;
import org.apache.hadoop.mapreduce.test.system.JobInfo;
@@ -340,4 +344,177 @@ public privileged aspect JobTrackerAspec
jobSummary.append(tracker.getClusterMetrics().getReduceSlotCapacity());
return jobSummary.toString();
}
+
+ /**
+ * This gets the value of one task tracker window in the tasktracker page.
+ *
+ * @param TaskTrackerStatus,
+ * timePeriod and totalTasksOrSucceededTasks, which are requried to
+ * identify the window
+ * @return The number of tasks info in a particular window in
+ * tasktracker page.
+ */
+ public int JobTracker.getTaskTrackerLevelStatistics(
+ TaskTrackerStatus ttStatus, String timePeriod,
+ String totalTasksOrSucceededTasks) throws IOException {
+
+ LOG.info("ttStatus host :" + ttStatus.getHost());
+ if (timePeriod.matches("since_start")) {
+ StatisticsCollector.TimeWindow window = getStatistics().
+ collector.DEFAULT_COLLECT_WINDOWS[0];
+ return(getNumberOfTasks(window, ttStatus ,
+ totalTasksOrSucceededTasks));
+ } else if (timePeriod.matches("last_day")) {
+ StatisticsCollector.TimeWindow window = getStatistics().
+ collector.DEFAULT_COLLECT_WINDOWS[1];
+ return(getNumberOfTasks(window, ttStatus,
+ totalTasksOrSucceededTasks));
+ } else if (timePeriod.matches("last_hour")) {
+ StatisticsCollector.TimeWindow window = getStatistics().
+ collector.DEFAULT_COLLECT_WINDOWS[2];
+ return(getNumberOfTasks(window, ttStatus ,
+ totalTasksOrSucceededTasks));
+ }
+ return -1;
+ }
+
+ /**
+ * Get Information for Time Period and TaskType box
+ * from all tasktrackers
+ *
+ * @param
+ * timePeriod and totalTasksOrSucceededTasks, which are requried to
+ * identify the window
+ * @return The total number of tasks info for a particular column in
+ * tasktracker page.
+ */
+ public int JobTracker.getInfoFromAllClients(String timePeriod,
+ String totalTasksOrSucceededTasks) throws IOException {
+
+ int totalTasksCount = 0;
+ int totalTasksRanForJob = 0;
+ for (TaskTracker tt : taskTrackers.values()) {
+ TaskTrackerStatus ttStatus = tt.getStatus();
+ String tasktrackerName = ttStatus.getHost();
+ List<Integer> taskTrackerValues = new LinkedList<Integer>();
+ JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics().
+ getTaskTrackerStat(ttStatus.getTrackerName());
+ int totalTasks = getTaskTrackerLevelStatistics(
+ ttStatus, timePeriod, totalTasksOrSucceededTasks);
+ totalTasksCount += totalTasks;
+ }
+ return totalTasksCount;
+ }
+
+ private int JobTracker.getNumberOfTasks(StatisticsCollector.TimeWindow
+ window, TaskTrackerStatus ttStatus, String totalTasksOrSucceededTasks ) {
+ JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics().
+ getTaskTrackerStat(ttStatus.getTrackerName());
+ if (totalTasksOrSucceededTasks.matches("total_tasks")) {
+ return ttStat.totalTasksStat.getValues().
+ get(window).getValue();
+ } else if (totalTasksOrSucceededTasks.matches("succeeded_tasks")) {
+ return ttStat.succeededTasksStat.getValues().
+ get(window).getValue();
+ }
+ return -1;
+ }
+
+ /**
+ * This gets the value of all task trackers windows in the tasktracker page.
+ *
+ * @param none,
+ * @return StatisticsCollectionHandler class which holds the number
+ * of all jobs ran from all tasktrackers, in the sequence given below
+ * "since_start - total_tasks"
+ * "since_start - succeeded_tasks"
+ * "last_hour - total_tasks"
+ * "last_hour - succeeded_tasks"
+ * "last_day - total_tasks"
+ * "last_day - succeeded_tasks"
+ */
+ public StatisticsCollectionHandler JobTracker.
+ getInfoFromAllClientsForAllTaskType() throws Exception {
+
+ //The outer list will have a list of each tasktracker list.
+ //The inner list will have a list of all number of tasks in
+ //one tasktracker.
+ List<List<Integer>> ttInfoList = new LinkedList<List<Integer>>();
+
+ // Go through each tasktracker and get all the number of tasks
+ // six window's values of that tasktracker.Each window points to
+ // specific value for that tasktracker.
+ //"since_start - total_tasks"
+ //"since_start - succeeded_tasks"
+ //"last_hour - total_tasks"
+ //"last_hour - succeeded_tasks"
+ //"last_day - total_tasks"
+ //"last_day - succeeded_tasks"
+
+ for (TaskTracker tt : taskTrackers.values()) {
+ TaskTrackerStatus ttStatus = tt.getStatus();
+ String tasktrackerName = ttStatus.getHost();
+ List<Integer> taskTrackerValues = new LinkedList<Integer>();
+ JobTrackerStatistics.TaskTrackerStat ttStat = getStatistics().
+ getTaskTrackerStat(ttStatus.getTrackerName());
+
+ int value;
+ int totalCount = 0;
+ for (int i = 0; i < 3; i++) {
+ StatisticsCollector.TimeWindow window = getStatistics().
+ collector.DEFAULT_COLLECT_WINDOWS[i];
+ value=0;
+ value = ttStat.totalTasksStat.getValues().
+ get(window).getValue();
+ taskTrackerValues.add(value);
+ value=0;
+ value = ttStat.succeededTasksStat.getValues().
+ get(window).getValue();
+ taskTrackerValues.add(value);
+ }
+ ttInfoList.add(taskTrackerValues);
+ }
+
+ //The info is collected in the order described above by going
+ //through each tasktracker list
+ int totalInfoValues = 0;
+ StatisticsCollectionHandler statisticsCollectionHandler =
+ new StatisticsCollectionHandler();
+ for (int i = 0; i < 6; i++) {
+ totalInfoValues = 0;
+ for (int j = 0; j < ttInfoList.size(); j++) {
+ List<Integer> list = ttInfoList.get(j);
+ totalInfoValues += list.get(i);
+ }
+ switch (i) {
+ case 0: statisticsCollectionHandler.
+ setSinceStartTotalTasks(totalInfoValues);
+ break;
+ case 1: statisticsCollectionHandler.
+ setSinceStartSucceededTasks(totalInfoValues);
+ break;
+ case 2: statisticsCollectionHandler.
+ setLastHourTotalTasks(totalInfoValues);
+ break;
+ case 3: statisticsCollectionHandler.
+ setLastHourSucceededTasks(totalInfoValues);
+ break;
+ case 4: statisticsCollectionHandler.
+ setLastDayTotalTasks(totalInfoValues);
+ break;
+ case 5: statisticsCollectionHandler.
+ setLastDaySucceededTasks(totalInfoValues);
+ break;
+ }
+ }
+ return statisticsCollectionHandler;
+ }
+
+ /*
+ * Get the Tasktrcker Heart beat interval
+ */
+ public int JobTracker.getTaskTrackerHeartbeatInterval()
+ throws Exception {
+ return (getNextHeartbeatInterval());
+ }
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj?rev=1077582&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/StatisticsCollectorAspect.aj Fri Mar 4 04:31:31 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.mapred.StatisticsCollector.TimeWindow;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * This class will change the number of jobs time windows
+ * of all task trackers <br/>
+ * Last Day time window will be changed from 24 hours to 2 minutes <br/>
+ * Last Hour time window will be changed from 1 hour to 1 minute <br/>
+ */
+
+public privileged aspect StatisticsCollectorAspect {
+
+ //last day is changed to 120 seconds instead of 24 hours,
+ //with a 10 seconds refresh rate
+ static final TimeWindow
+ LAST_DAY_ASPECT = new TimeWindow("Last Day", 2 * 60, 10);
+
+ //last day is changed to 60 seconds instead of 1 hour,
+ //with a 10 seconds refresh rate
+ static final TimeWindow
+ LAST_HOUR_ASPECT = new TimeWindow("Last Hour", 60, 10);
+
+ private static final Log LOG = LogFactory
+ .getLog(StatisticsCollectorAspect.class);
+
+ pointcut createStatExecution(String name, TimeWindow[] window) :
+ call(* StatisticsCollector.createStat(String, TimeWindow[]))
+ && args(name, window);
+
+ //This will change the timewindow to have last day and last hour changed.
+ before(String name, TimeWindow[] window) : createStatExecution(name, window) {
+ window[1] = LAST_DAY_ASPECT;
+ window[2] = LAST_HOUR_ASPECT;
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java?rev=1077582&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java Fri Mar 4 04:31:31 2011
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapred.JobClient.NetworkedJob;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.Path;
+import testjar.GenerateTaskChildProcess;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Task Tracker Info functionality.
+ */
+
+public class TestTaskTrackerInfoSuccessfulFailedJobs {
+
+ private static MRCluster cluster = null;
+ private static JobClient client = null;
+ static final Log LOG = LogFactory.
+ getLog(TestTaskTrackerInfoSuccessfulFailedJobs.class);
+ private static Configuration conf = null;
+ private static JTProtocol remoteJTClient = null;
+
+ StatisticsCollectionHandler statisticsCollectionHandler = null;
+ int taskTrackerHeartBeatInterval = 0;
+
+ public TestTaskTrackerInfoSuccessfulFailedJobs() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ conf = new Configuration(cluster.getConf());
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
+ remoteJTClient = cluster.getJTClient().getProxy();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cluster.tearDown();
+ }
+
+ @Test
+ /**
+ * This tests Task tracker summary information for
+ * since start - total tasks, successful tasks
+ * Last Hour - total tasks, successful tasks
+ * Last Day - total tasks, successful tasks
+ * It is checked for multiple job submissions.
+ * @param none
+ * @return void
+ */
+ public void testTaskTrackerInfoAll() throws Exception {
+
+ //This boolean will decide whether to run job again
+ boolean continueLoop = true;
+
+ //counter for job Loop
+ int countLoop = 0;
+
+ String jobTrackerUserName = remoteJTClient.getDaemonUser();
+
+ LOG.info("jobTrackerUserName is :" + jobTrackerUserName);
+
+ //This counter will check for count of a loop,
+ //which might become infinite.
+ int count = 0;
+
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ int totalMapTasks = 5;
+ int totalReduceTasks = 1;
+ conf = job.setupJobConf(totalMapTasks, totalReduceTasks,
+ 100, 100, 100, 100);
+ JobConf jconf = new JobConf(conf);
+
+ count = 0;
+ //The last hour and last day are given 60 seconds and 120 seconds
+ //recreate values rate, replacing one hour and 1 day. Waiting for
+ //them to be ona just created stage when testacse starts.
+ while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks")
+ != 0) {
+ count++;
+ UtilsForTests.waitFor(1000);
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 180) {
+ Assert.fail("Since this value has not reached 0" +
+ "in more than 180 seconds. Failing at this point");
+ }
+ }
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler = remoteJTClient.
+ getInfoFromAllClientsForAllTaskType();
+
+ int totalTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ //Submitting the job
+ RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+
+ JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ LOG.info("jInfo is :" + jInfo);
+
+ //Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ count = 0;
+ LOG.info("Waiting till the job is completed...");
+ while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(1000);
+ count++;
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 40) {
+ Assert.fail("job has not reached completed state for more" +
+ " than 400 seconds. Failing at this point");
+ }
+ }
+
+ //Waiting for 20 seconds to make sure that all the completed tasks
+ //are reflected in their corresponding Tasktracker boxes.
+ taskTrackerHeartBeatInterval = remoteJTClient.
+ getTaskTrackerHeartbeatInterval();
+
+ //Waiting for 6 times the Task tracker heart beat interval to
+ //account for network slowness, job tracker processing time
+ //after receiving the tasktracker updates etc.
+ UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6);
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler =
+ remoteJTClient.getInfoFromAllClientsForAllTaskType();
+ int totalTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2);
+
+ Assert.assertEquals("The number of total tasks, since start" +
+ " dont match",
+ (totalTasksSinceStartBeforeJob + totalTasksForJob),
+ totalTasksSinceStartAfterJob);
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksSinceStartBeforeJob + totalTasksForJob),
+ succeededTasksSinceStartAfterJob);
+
+ Assert.assertEquals("The number of total tasks, last hour" +
+ " dont match",
+ (totalTasksLastHourBeforeJob + totalTasksForJob),
+ totalTasksLastHourAfterJob);
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "last hour dont match",
+ (succeededTasksLastHourBeforeJob + totalTasksForJob),
+ succeededTasksLastHourAfterJob);
+
+ Assert.assertEquals("The number of total tasks, last day" +
+ " dont match",
+ (totalTasksLastDayBeforeJob + totalTasksForJob),
+ totalTasksLastDayAfterJob);
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksLastDayBeforeJob + totalTasksForJob),
+ succeededTasksLastDayAfterJob);
+ }
+
+ @Test
+ /**
+ * This tests Task tracker task killed
+ * summary information for
+ * since start - total tasks, successful tasks
+ * Last Hour - total tasks, successful tasks
+ * Last Day - total tasks, successful tasks
+ * It is checked for multiple job submissions.
+ * @param none
+ * @return void
+ */
+ public void testTaskTrackerInfoKilled() throws Exception {
+
+ //This boolean will decide whether to run job again
+ boolean continueLoop = true;
+
+ //counter for job Loop
+ int countLoop = 0;
+
+ TaskInfo taskInfo = null;
+
+ String jobTrackerUserName = remoteJTClient.getDaemonUser();
+
+ LOG.info("jobTrackerUserName is :" + jobTrackerUserName);
+
+ //This counter will check for count of a loop,
+ //which might become infinite.
+ int count = 0;
+
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ int totalMapTasks = 5;
+ int totalReduceTasks = 1;
+ conf = job.setupJobConf(totalMapTasks, totalReduceTasks,
+ 100, 100, 100, 100);
+ JobConf jconf = new JobConf(conf);
+
+ count = 0;
+ //The last hour and last day are given 60 seconds and 120 seconds
+ //recreate values rate, replacing one hour and 1 day. Waiting for
+ //them to be ona just created stage when testacse starts.
+ while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks")
+ != 0) {
+ count++;
+ UtilsForTests.waitFor(1000);
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 140) {
+ Assert.fail("Since this value has not reached 0" +
+ "in more than 140 seconds. Failing at this point");
+ }
+ }
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler = remoteJTClient.
+ getInfoFromAllClientsForAllTaskType();
+
+ int totalTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ //Submitting the job
+ RunningJob rJob = cluster.getJTClient().getClient().
+ submitJob(jconf);
+
+ JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ LOG.info("jInfo is :" + jInfo);
+
+ count = 0;
+ while (count < 60) {
+ if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ }
+ count++;
+ }
+ Assert.assertTrue("Job has not been started for 1 min.",
+ count != 60);
+
+ //Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(rJob.getID());
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ taskInfo = taskinfo;
+ }
+ }
+
+ count = 0;
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ while (count < 60) {
+ if (taskInfo.getTaskStatus().length > 0) {
+ if (taskInfo.getTaskStatus()[0].getRunState()
+ == TaskStatus.State.RUNNING) {
+ break;
+ }
+ }
+ UtilsForTests.waitFor(1000);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ count++;
+ }
+
+ Assert.assertTrue("Task has not been started for 1 min.",
+ count != 60);
+
+ NetworkedJob networkJob = (cluster.getJTClient().getClient()).new
+ NetworkedJob(jInfo.getStatus());
+ TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
+ TaskAttemptID taskAttID = new TaskAttemptID(tID , 0);
+ networkJob.killTask(taskAttID, false);
+
+ count = 0;
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(1000);
+ count++;
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 40) {
+ Assert.fail("job has not reached completed state for more" +
+ " than 400 seconds. Failing at this point");
+ }
+ }
+
+ //Waiting for 20 seconds to make sure that all the completed tasks
+ //are reflected in their corresponding Tasktracker boxes.
+ taskTrackerHeartBeatInterval = remoteJTClient.
+ getTaskTrackerHeartbeatInterval();
+
+ //Waiting for 6 times the Task tracker heart beat interval to
+ //account for network slowness, job tracker processing time
+ //after receiving the tasktracker updates etc.
+ UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6);
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler =
+ remoteJTClient.getInfoFromAllClientsForAllTaskType();
+ int totalTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ //Total tasks expected is setup, Cleanup + totalMapTasks
+ //+ totalReduceTasks
+ int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2);
+
+ //The total tasks will be equal to the totalTasksSinceStartBeforeJob
+ // + totalTasksFor present Job + 1 more task which was killed.
+ //This kiled task will be re-attempted by the job tracker and would have
+ //rerun in another tasktracker and would have completed successfully,
+ //which is captured in totalTasksForJob
+ Assert.assertEquals("The number of total tasks, since start" +
+ " dont match",
+ (totalTasksSinceStartBeforeJob + totalTasksForJob + 1),
+ totalTasksSinceStartAfterJob );
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksSinceStartBeforeJob + totalTasksForJob),
+ succeededTasksSinceStartAfterJob);
+
+ Assert.assertEquals("The number of total tasks, last hour" +
+ " dont match",
+ (totalTasksLastHourBeforeJob + totalTasksForJob + 1),
+ totalTasksLastHourAfterJob);
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "last hour dont match",
+ (succeededTasksLastHourBeforeJob + totalTasksForJob),
+ succeededTasksLastHourAfterJob);
+
+ Assert.assertEquals("The number of total tasks, last day" +
+ " dont match",
+ (totalTasksLastDayBeforeJob + totalTasksForJob + 1),
+ totalTasksLastDayAfterJob);
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksLastDayBeforeJob + totalTasksForJob),
+ succeededTasksLastDayAfterJob);
+
+ }
+
+ @Test
+ /**
+ * This tests Task tracker task failure
+ * summary information for
+ * since start - total tasks, successful tasks
+ * Last Hour - total tasks, successful tasks
+ * Last Day - total tasks, successful tasks
+ * @param none
+ * @return void
+ */
+ public void testTaskTrackerInfoTaskFailure() throws Exception {
+
+ //This boolean will decide whether to run job again
+ boolean continueLoop = true;
+
+ //counter for job Loop
+ int countLoop = 0;
+
+ TaskInfo taskInfo = null;
+
+ String jobTrackerUserName = remoteJTClient.getDaemonUser();
+
+ LOG.info("jobTrackerUserName is :" + jobTrackerUserName);
+
+ //This counter will check for count of a loop,
+ //which might become infinite.
+ int count = 0;
+
+ Configuration conf = new Configuration(cluster.getConf());
+ conf.setBoolean("mapreduce.map.output.compress", false);
+ conf.set("mapred.map.output.compression.codec",
+ "org.apache.hadoop.io.compress.DefaultCodec");
+ JobConf jconf = new JobConf(conf);
+ Path inputDir = new Path("input");
+ Path outputDir = new Path("output");
+ cleanup(inputDir, conf);
+ cleanup(outputDir, conf);
+
+ createInput(inputDir, conf);
+ jconf.setJobName("Task Failed job");
+ jconf.setJarByClass(UtilsForTests.class);
+ jconf.setMapperClass(GenerateTaskChildProcess.FailedMapper.class);
+ jconf.setNumMapTasks(1);
+ jconf.setNumReduceTasks(0);
+ jconf.setMaxMapAttempts(1);
+ FileInputFormat.setInputPaths(jconf, inputDir);
+ FileOutputFormat.setOutputPath(jconf, outputDir);
+
+ count = 0;
+ //The last hour and last day are given 60 seconds and 120 seconds
+ //recreate values rate, replacing one hour and 1 day. Waiting for
+ //them to be ona just created stage when testacse starts.
+ while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks")
+ != 0) {
+ count++;
+ UtilsForTests.waitFor(1000);
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 140) {
+ Assert.fail("Since this value has not reached 0" +
+ "in more than 140 seconds. Failing at this point");
+ }
+ }
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler = remoteJTClient.
+ getInfoFromAllClientsForAllTaskType();
+
+ int totalTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+ JobID id = rJob.getID();
+ JobInfo jInfo = remoteJTClient.getJobInfo(id);
+
+ LOG.info("jInfo is :" + jInfo);
+
+ count = 0;
+ while (count < 60) {
+ if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ }
+ count++;
+ }
+ Assert.assertTrue("Job has not been started for 1 min.",
+ count != 60);
+
+ //Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ count = 0;
+ LOG.info("Waiting till the job is completed...");
+ while ( jInfo != null && (!jInfo.getStatus().isJobComplete())) {
+ UtilsForTests.waitFor(1000);
+ count++;
+ jInfo = remoteJTClient.getJobInfo(id);
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 40) {
+ Assert.fail("job has not reached completed state for more" +
+ " than 400 seconds. Failing at this point");
+ }
+ }
+
+ //Waiting for 20 seconds to make sure that all the completed tasks
+ //are reflected in their corresponding Tasktracker boxes.
+ taskTrackerHeartBeatInterval = remoteJTClient.
+ getTaskTrackerHeartbeatInterval();
+
+ //Waiting for 6 times the Task tracker heart beat interval to
+ //account for network slowness, job tracker processing time
+ //after receiving the tasktracker updates etc.
+ UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6);
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler =
+ remoteJTClient.getInfoFromAllClientsForAllTaskType();
+ int totalTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ //1 map running 4 times before failure, plus sometimes two failures
+ //which are not captured in Job summary, but caught in
+ //tasktracker summary.
+ //0 reduces, setup and cleanup
+ int totalTasksForJob = 4;
+
+ Assert.assertTrue("The number of total tasks, since start" +
+ " dont match", (totalTasksSinceStartAfterJob >=
+ totalTasksSinceStartBeforeJob + totalTasksForJob));
+
+ Assert.assertTrue("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksSinceStartAfterJob >=
+ succeededTasksSinceStartBeforeJob));
+
+ Assert.assertTrue("The number of total tasks, last hour" +
+ " dont match", (totalTasksLastHourAfterJob >=
+ totalTasksLastHourBeforeJob + totalTasksForJob));
+ Assert.assertTrue("The number of succeeded tasks, " +
+ "last hour dont match", (succeededTasksLastHourAfterJob >=
+ succeededTasksLastHourBeforeJob));
+
+ Assert.assertTrue("The number of total tasks, last day" +
+ " dont match", totalTasksLastDayAfterJob >=
+ totalTasksLastDayBeforeJob + totalTasksForJob);
+ Assert.assertTrue("The number of succeeded tasks, " +
+ "since start dont match", succeededTasksLastDayAfterJob >=
+ succeededTasksLastDayBeforeJob);
+ }
+
+ //This creates the input directories in the dfs
+ private void createInput(Path inDir, Configuration conf) throws
+ IOException {
+ String input = "Hadoop is framework for data intensive distributed "
+ + "applications.\n"
+ + "Hadoop enables applications to work with thousands of nodes.";
+ FileSystem fs = inDir.getFileSystem(conf);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Failed to create the input directory:"
+ + inDir.toString());
+ }
+ fs.setPermission(inDir, new FsPermission(FsAction.ALL,
+ FsAction.ALL, FsAction.ALL));
+ DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+ int i = 0;
+ while(i < 1000 * 3000) {
+ file.writeBytes(input);
+ i++;
+ }
+ file.close();
+ }
+
+ //This cleans up the specified directories in the dfs
+ private void cleanup(Path dir, Configuration conf) throws
+ IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ fs.delete(dir, true);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java?rev=1077582&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoTTProcess.java Fri Mar 4 04:31:31 2011
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.mapred.StatisticsCollectionHandler;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Task Tracker Info functionality.
+ */
+
+public class TestTaskTrackerInfoTTProcess {
+
+ private static MRCluster cluster = null;
+ private static JobClient client = null;
+ static final Log LOG = LogFactory.
+ getLog(TestTaskTrackerInfoTTProcess.class);
+ private static Configuration conf = null;
+ private static JTProtocol remoteJTClient = null;
+ private static String confFile = "mapred-site.xml";
+ int taskTrackerHeartBeatInterval;
+ StatisticsCollectionHandler statisticsCollectionHandler = null;
+
+ public TestTaskTrackerInfoTTProcess() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ conf = new Configuration(cluster.getConf());
+ conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
+ remoteJTClient = cluster.getJTClient().getProxy();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cluster.tearDown();
+ }
+
+ @Test
+ /**
+ * This tests Task tracker info when a job with 0 maps and 0 reduces run
+ * summary information for
+ * since start - total tasks, successful tasks
+ * Last Hour - total tasks, successful tasks
+ * Last Day - total tasks, successful tasks
+ * It is checked for multiple job submissions.
+ * @param none
+ * @return void
+ */
+ public void testTaskTrackerInfoZeroMapsZeroReduces() throws Exception {
+
+ TaskInfo taskInfo = null;
+
+ String jobTrackerUserName = remoteJTClient.getDaemonUser();
+
+ LOG.info("jobTrackerUserName is :" + jobTrackerUserName);
+
+ int count = 0;
+
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ int totalMapTasks = 0;
+ int totalReduceTasks = 0;
+ conf = job.setupJobConf(totalMapTasks, totalReduceTasks,
+ 100, 100, 100, 100);
+ JobConf jconf = new JobConf(conf);
+
+ count = 0;
+ //The last hour and last day are given 60 seconds and 120 seconds
+ //recreate values rate, replacing one hour and 1 day. Waiting for
+ //them to be ona just created stage when testacse starts.
+ while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks")
+ != 0) {
+ count++;
+ UtilsForTests.waitFor(1000);
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 140) {
+ Assert.fail("Since this value has not reached 0" +
+ "in more than 140 seconds. Failing at this point");
+ }
+ }
+
+ statisticsCollectionHandler = remoteJTClient.
+ getInfoFromAllClientsForAllTaskType();
+
+ int totalTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ //Submitting the job
+ RunningJob rJob = cluster.getJTClient().getClient().
+ submitJob(jconf);
+
+ JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ LOG.info("jInfo is :" + jInfo);
+
+ count = 0;
+ while (count < 60) {
+ if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ }
+ count++;
+ }
+ Assert.assertTrue("Job has not been started for 1 min.",
+ count != 60);
+
+ //Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ count = 0;
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(1000);
+ count++;
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 40) {
+ Assert.fail("job has not reached completed state for more" +
+ " than 400 seconds. Failing at this point");
+ }
+ }
+
+ //Waiting for 20 seconds to make sure that all the completed tasks
+ //are reflected in their corresponding Tasktracker boxes.
+ taskTrackerHeartBeatInterval = remoteJTClient.
+ getTaskTrackerHeartbeatInterval();
+
+ //Waiting for 6 times the Task tracker heart beat interval to
+ //account for network slowness, job tracker processing time
+ //after receiving the tasktracker updates etc.
+ UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6);
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler =
+ remoteJTClient.getInfoFromAllClientsForAllTaskType();
+ int totalTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2);
+
+ Assert.assertEquals("The number of total tasks, since start" +
+ " dont match",
+ (totalTasksSinceStartBeforeJob + totalTasksForJob),
+ totalTasksSinceStartAfterJob );
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksSinceStartBeforeJob + totalTasksForJob),
+ succeededTasksSinceStartAfterJob);
+
+ Assert.assertEquals("The number of total tasks, last hour" +
+ " dont match",
+ (totalTasksLastHourBeforeJob + totalTasksForJob),
+ totalTasksLastHourAfterJob);
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "last hour dont match",
+ (succeededTasksLastHourBeforeJob + totalTasksForJob),
+ succeededTasksLastHourAfterJob);
+
+ Assert.assertEquals("The number of total tasks, last day" +
+ " dont match",
+ (totalTasksLastDayBeforeJob + totalTasksForJob),
+ totalTasksLastDayAfterJob);
+ Assert.assertEquals("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksLastDayBeforeJob + totalTasksForJob),
+ succeededTasksLastDayAfterJob);
+ }
+
+ @Test
+ /**
+ * This tests Task tracker info when tasktracker is suspended/killed
+ * and then the process comes alive again
+ * summary information for
+ * since start - total tasks, successful tasks
+ * Last Hour - total tasks, successful tasks
+ * Last Day - total tasks, successful tasks
+ * It is checked for multiple job submissions.
+ * @param none
+ * @return void
+ */
+ public void testTaskTrackerInfoTaskTrackerSuspend() throws Exception {
+
+ TaskInfo taskInfo = null;
+
+ String jobTrackerUserName = remoteJTClient.getDaemonUser();
+
+ LOG.info("jobTrackerUserName is :" + jobTrackerUserName);
+
+ int count = 0;
+
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ int totalMapTasks = 5;
+ int totalReduceTasks = 1;
+ conf = job.setupJobConf(totalMapTasks, totalReduceTasks,
+ 100, 100, 100, 100);
+ JobConf jconf = new JobConf(conf);
+
+ //The last hour and last day are given 60 seconds and 120 seconds
+ //recreate values rate, replacing one hour and 1 day. Waiting for
+ //them to be ona just created stage when testacse starts.
+ while (remoteJTClient.getInfoFromAllClients("last_day","total_tasks")
+ != 0) {
+ count++;
+ UtilsForTests.waitFor(1000);
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 140) {
+ Assert.fail("Since this value has not reached 0" +
+ "in more than 140 seconds. Failing at this point");
+ }
+ }
+
+ statisticsCollectionHandler = remoteJTClient.
+ getInfoFromAllClientsForAllTaskType();
+
+ int totalTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartBeforeJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourBeforeJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayBeforeJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ //Submitting the job
+ RunningJob rJob = cluster.getJTClient().getClient().
+ submitJob(jconf);
+
+ JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ LOG.info("jInfo is :" + jInfo);
+
+ count = 0;
+ while (count < 60) {
+ if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+ break;
+ } else {
+ UtilsForTests.waitFor(1000);
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ }
+ count++;
+ }
+ Assert.assertTrue("Job has not been started for 1 min.",
+ count != 60);
+
+ //Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(rJob.getID());
+ for (TaskInfo taskinfo : taskInfos) {
+ if (!taskinfo.isSetupOrCleanup()) {
+ taskInfo = taskinfo;
+ break;
+ }
+ }
+
+ TTClient ttClient = cluster.getTTClientInstance(taskInfo);
+ String pid = null;
+ ttClient.kill();
+ ttClient.waitForTTStop();
+ ttClient.start();
+ ttClient.waitForTTStart();
+
+ count = 0;
+ LOG.info("Waiting till the job is completed...");
+ while (!jInfo.getStatus().isJobComplete()) {
+ UtilsForTests.waitFor(1000);
+ count++;
+ jInfo = remoteJTClient.getJobInfo(rJob.getID());
+ //If the count goes beyond a point, then break; This is to avoid
+ //infinite loop under unforeseen circumstances. Testcase will
+ //anyway fail later.
+ if (count > 40) {
+ Assert.fail("job has not reached completed state for more" +
+ " than 400 seconds. Failing at this point");
+ }
+ }
+
+ //Waiting for 20 seconds to make sure that all the completed tasks
+ //are reflected in their corresponding Tasktracker boxes.
+ taskTrackerHeartBeatInterval = remoteJTClient.
+ getTaskTrackerHeartbeatInterval();
+
+ //Waiting for 6 times the Task tracker heart beat interval to
+ //account for network slowness, job tracker processing time
+ //after receiving the tasktracker updates etc.
+ UtilsForTests.waitFor(taskTrackerHeartBeatInterval * 6);
+
+ statisticsCollectionHandler = null;
+ statisticsCollectionHandler =
+ remoteJTClient.getInfoFromAllClientsForAllTaskType();
+ int totalTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartTotalTasks();
+ int succeededTasksSinceStartAfterJob = statisticsCollectionHandler.
+ getSinceStartSucceededTasks();
+ int totalTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourTotalTasks();
+ int succeededTasksLastHourAfterJob = statisticsCollectionHandler.
+ getLastHourSucceededTasks();
+ int totalTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDayTotalTasks();
+ int succeededTasksLastDayAfterJob = statisticsCollectionHandler.
+ getLastDaySucceededTasks();
+
+ int totalTasksForJob = (totalMapTasks + totalReduceTasks + 2);
+
+ Assert.assertTrue("The number of total tasks, since start" +
+ " dont match",
+ (totalTasksSinceStartAfterJob >= succeededTasksSinceStartBeforeJob
+ + totalTasksForJob));
+
+ Assert.assertTrue("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksSinceStartAfterJob >= succeededTasksSinceStartBeforeJob
+ + totalTasksForJob));
+
+ Assert.assertTrue("The number of total tasks, last hour" +
+ " dont match",
+ ( totalTasksLastHourAfterJob >= totalTasksLastHourBeforeJob +
+ totalTasksForJob));
+
+ Assert.assertTrue("The number of succeeded tasks, " +
+ "last hour dont match",
+ (succeededTasksLastHourAfterJob >= succeededTasksLastHourBeforeJob +
+ totalTasksForJob));
+
+ Assert.assertTrue("The number of total tasks, last day" +
+ " dont match",
+ (totalTasksLastDayAfterJob >= totalTasksLastDayBeforeJob +
+ totalTasksForJob));
+
+ Assert.assertTrue("The number of succeeded tasks, " +
+ "since start dont match",
+ (succeededTasksLastDayAfterJob >= succeededTasksLastDayBeforeJob +
+ totalTasksForJob));
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java?rev=1077582&r1=1077581&r2=1077582&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java Fri Mar 4 04:31:31 2011
@@ -23,6 +23,8 @@ import java.io.IOException;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.test.system.DaemonProtocol;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapred.StatisticsCollectionHandler;
/**
* Client side API's exposed from JobTracker.
@@ -144,4 +146,44 @@ public interface JTProtocol extends Daem
* @throws IOException if any I/O error occurs.
*/
public String getJobSummaryInfo(JobID jobId) throws IOException;
+
+
+ /**
+ * This gets the value of one task tracker window in the tasktracker page.
+ *
+ * @param TaskTrackerStatus,
+ * timePeriod and totalTasksOrSucceededTasks, which are requried to
+ * identify the window
+ * @return value of one task in a single Job tracker window
+ */
+ public int getTaskTrackerLevelStatistics(TaskTrackerStatus ttStatus,
+ String timePeriod, String totalTasksOrSucceededTasks)
+ throws IOException;
+
+ /**
+ * This gets the value of all task trackers windows in the tasktracker page.
+ *
+ * @param none,
+ * return a object which returns all the tasktracker info
+ */
+ public StatisticsCollectionHandler getInfoFromAllClientsForAllTaskType()
+ throws Exception;
+
+ /**
+ * Get Information for Time Period and TaskType box
+ * from all tasktrackers
+ * @param
+ * timePeriod and totalTasksOrSucceededTasks, which are requried to
+ * identify the window
+ * @return The total number of tasks info for a particular column in
+ * tasktracker page.
+ */
+ public int getInfoFromAllClients(String timePeriod,
+ String totalTasksOrSucceededTasks) throws IOException;
+
+ /**
+ * This gets the value of all task trackers windows in the tasktracker page.
+ */
+ public int getTaskTrackerHeartbeatInterval() throws Exception;
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077582&r1=1077581&r2=1077582&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar 4 04:31:31 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.test.system.pro
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskID;
import java.util.Collection;
+import org.apache.hadoop.mapred.UtilsForTests;
/**
* Concrete AbstractDaemonCluster representing a Map-Reduce cluster.
@@ -203,4 +204,34 @@ public class MRCluster extends AbstractD
super(mrDaemonInfos);
}
}
+
+ /**
+ * Get a TTClient Instance from a running task <br/>
+ * @param Task Information of the running task
+ * @return TTClient instance
+ * @throws IOException
+ */
+ public TTClient getTTClientInstance(TaskInfo taskInfo)
+ throws IOException {
+ JTProtocol remoteJTClient = getJTClient().getProxy();
+ String [] taskTrackers = taskInfo.getTaskTrackers();
+ int counter = 0;
+ TTClient ttClient = null;
+ while (counter < 60) {
+ if (taskTrackers.length != 0) {
+ break;
+ }
+ UtilsForTests.waitFor(100);
+ taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+ taskTrackers = taskInfo.getTaskTrackers();
+ counter ++;
+ }
+ if ( taskTrackers.length != 0 ) {
+ String hostName = taskTrackers[0].split("_")[1];
+ hostName = hostName.split(":")[0];
+ ttClient = getTTClient(hostName);
+ }
+ return ttClient;
+ }
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java?rev=1077582&r1=1077581&r2=1077582&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java Fri Mar 4 04:31:31 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapred.JobTrack
import org.apache.hadoop.mapred.TaskTrackerStatus;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.UtilsForTests;
@@ -40,6 +42,7 @@ import org.apache.hadoop.mapred.UtilsFor
public class TTClient extends MRDaemonClient<TTProtocol> {
TTProtocol proxy;
+ static final Log LOG = LogFactory.getLog(TTClient.class);
public TTClient(Configuration conf, RemoteProcess daemon)
throws IOException {
@@ -114,4 +117,45 @@ public class TTClient extends MRDaemonCl
return (counter != 60)? true : false;
}
+ /**
+ * Waits till this Tasktracker daemon process is stopped <br/>
+ *
+ * @return void
+ * @throws IOException
+ */
+ public void waitForTTStop() throws IOException {
+ LOG.info("Waiting for Tasktracker:" + getHostName()
+ + " to stop.....");
+ while (true) {
+ try {
+ ping();
+ LOG.debug(getHostName() +" is waiting state to stop.");
+ UtilsForTests.waitFor(10000);
+ } catch (Exception exp) {
+ LOG.info("TaskTracker : " + getHostName() + " is stopped...");
+ break;
+ }
+ }
+ }
+
+ /**
+ * Waits till this Tasktracker daemon process is started <br/>
+ *
+ * @return void
+ * @throws IOException
+ */
+ public void waitForTTStart() throws
+ IOException {
+ LOG.debug("Waiting for Tasktracker:" + getHostName() + " to come up.");
+ while (true) {
+ try {
+ ping();
+ LOG.debug("TaskTracker : " + getHostName() + " is pinging...");
+ break;
+ } catch (Exception exp) {
+ LOG.info(getHostName() + " is waiting to come up.");
+ UtilsForTests.waitFor(10000);
+ }
+ }
+ }
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java?rev=1077582&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/shared/org/apache/hadoop/mapred/StatisticsCollectionHandler.java Fri Mar 4 04:31:31 2011
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/*
+ * Object which stores all the tasktracker info
+ */
+public class StatisticsCollectionHandler implements Writable{
+
+ private int sinceStartTotalTasks = 0;
+ private int sinceStartSucceededTasks = 0;
+ private int lastHourTotalTasks = 0;
+ private int lastHourSucceededTasks = 0;
+ private int lastDayTotalTasks = 0;
+ private int lastDaySucceededTasks = 0;
+
+ public int getSinceStartTotalTasks() {
+ return sinceStartTotalTasks;
+ }
+
+ public int getSinceStartSucceededTasks() {
+ return sinceStartSucceededTasks;
+ }
+
+ public int getLastHourTotalTasks() {
+ return lastHourTotalTasks;
+ }
+
+ public int getLastHourSucceededTasks() {
+ return lastHourSucceededTasks;
+ }
+
+ public int getLastDayTotalTasks() {
+ return lastDayTotalTasks;
+ }
+
+ public int getLastDaySucceededTasks() {
+ return lastDaySucceededTasks;
+ }
+
+ public void setSinceStartTotalTasks(int value) {
+ sinceStartTotalTasks = value;
+ }
+
+ public void setSinceStartSucceededTasks(int value) {
+ sinceStartSucceededTasks = value;
+ }
+
+ public void setLastHourTotalTasks(int value) {
+ lastHourTotalTasks = value;
+ }
+
+ public void setLastHourSucceededTasks(int value) {
+ lastHourSucceededTasks = value;
+ }
+
+ public void setLastDayTotalTasks(int value) {
+ lastDayTotalTasks = value;
+ }
+
+ public void setLastDaySucceededTasks(int value) {
+ lastDaySucceededTasks = value;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ sinceStartTotalTasks = WritableUtils.readVInt(in);
+ sinceStartSucceededTasks = WritableUtils.readVInt(in);
+ lastHourTotalTasks = WritableUtils.readVInt(in);
+ lastHourSucceededTasks = WritableUtils.readVInt(in);
+ lastDayTotalTasks = WritableUtils.readVInt(in);
+ lastDaySucceededTasks = WritableUtils.readVInt(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, sinceStartTotalTasks);
+ WritableUtils.writeVInt(out, sinceStartSucceededTasks);
+ WritableUtils.writeVInt(out, lastHourTotalTasks);
+ WritableUtils.writeVInt(out, lastHourSucceededTasks);
+ WritableUtils.writeVInt(out, lastDayTotalTasks);
+ WritableUtils.writeVInt(out, lastDaySucceededTasks);
+ }
+
+}
+