You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/09/08 03:39:23 UTC
svn commit: r1166495 [2/6] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Sep 8 01:39:07 2011
@@ -218,7 +218,14 @@ public class MRApps extends Apps {
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(
- conf.get(MRConstants.APPS_STAGING_DIR_KEY) +
+ conf.get(MRConstants.APPS_STAGING_DIR_KEY) +
Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
}
+
+ public static String getJobFile(Configuration conf, String user,
+ org.apache.hadoop.mapreduce.JobID jobId) {
+ Path jobFile = new Path(MRApps.getStagingAreaDir(conf, user),
+ jobId.toString() + Path.SEPARATOR + MRConstants.JOB_CONF_FILE);
+ return jobFile.toString();
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Thu Sep 8 01:39:07 2011
@@ -139,6 +139,8 @@ message JobReportProto {
optional float setup_progress = 6;
optional int64 start_time = 7;
optional int64 finish_time = 8;
+ optional string user = 9;
+ optional string jobName = 10;
}
enum TaskAttemptCompletionEventStatusProto {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Thu Sep 8 01:39:07 2011
@@ -21,8 +21,11 @@ import junit.framework.Assert;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import org.junit.Test;
public class TestTypeConverter {
@@ -35,8 +38,33 @@ public class TestTypeConverter {
applicationReport.setApplicationId(applicationId);
applicationReport.setState(state);
applicationReport.setStartTime(appStartTime);
- JobStatus jobStatus = TypeConverter.fromYarn(applicationReport);
+ applicationReport.setUser("TestTypeConverter-user");
+ JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile");
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
}
+
+ @Test
+ public void testFromYarnApplicationReport() {
+ ApplicationId mockAppId = mock(ApplicationId.class);
+ when(mockAppId.getClusterTimestamp()).thenReturn(12345L);
+ when(mockAppId.getId()).thenReturn(6789);
+
+ ApplicationReport mockReport = mock(ApplicationReport.class);
+ when(mockReport.getTrackingUrl()).thenReturn("dummy-tracking-url");
+ when(mockReport.getApplicationId()).thenReturn(mockAppId);
+ when(mockReport.getState()).thenReturn(ApplicationState.KILLED);
+ when(mockReport.getUser()).thenReturn("dummy-user");
+ when(mockReport.getQueue()).thenReturn("dummy-queue");
+ String jobFile = "dummy-path/job.xml";
+ JobStatus status = TypeConverter.fromYarn(mockReport, jobFile);
+ Assert.assertNotNull("fromYarn returned null status", status);
+ Assert.assertEquals("jobFile set incorrectly", "dummy-path/job.xml", status.getJobFile());
+ Assert.assertEquals("queue set incorrectly", "dummy-queue", status.getQueue());
+ Assert.assertEquals("trackingUrl set incorrectly", "dummy-tracking-url", status.getTrackingUrl());
+ Assert.assertEquals("user set incorrectly", "dummy-user", status.getUsername());
+ Assert.assertEquals("schedulingInfo set incorrectly", "dummy-tracking-url", status.getSchedulingInfo());
+ Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId());
+ Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState());
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Sep 8 01:39:07 2011
@@ -18,10 +18,13 @@
package org.apache.hadoop.mapreduce.v2.util;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.MRConstants;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -107,4 +110,14 @@ public class TestMRApps {
@Test(expected=YarnException.class) public void testTaskAttemptIDShort() {
MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
}
+
+ @Test public void testGetJobFileWithUser() {
+ Configuration conf = new Configuration();
+ conf.set(MRConstants.APPS_STAGING_DIR_KEY, "/my/path/to/staging");
+ String jobFile = MRApps.getJobFile(conf, "dummy-user", new JobID("dummy-job", 12345));
+ assertNotNull("getJobFile results in null.", jobFile);
+ assertEquals("jobFile with specified user is not as expected.",
+ "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
+ }
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Thu Sep 8 01:39:07 2011
@@ -44,7 +44,7 @@
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
- <version>1.5.2</version>
+ <version>1.5.3</version>
<executions>
<execution>
<phase>generate-sources</phase>
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java Thu Sep 8 01:39:07 2011
@@ -372,7 +372,7 @@ public class Counters
* @param id the id of the counter within the group (0 to N-1)
* @param name the internal name of the counter
* @return the counter for that name
- * @deprecated use {@link findCounter(String, String)} instead
+ * @deprecated use {@link #findCounter(String, String)} instead
*/
@Deprecated
public Counter findCounter(String group, int id, String name) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Thu Sep 8 01:39:07 2011
@@ -49,7 +49,7 @@ import org.apache.hadoop.util.ToolRunner
/**
* <code>JobClient</code> is the primary interface for the user-job to interact
- * with the {@link JobTracker}.
+ * with the cluster.
*
* <code>JobClient</code> provides facilities to submit jobs, track their
* progress, access component-tasks' reports/logs, get the Map-Reduce cluster
@@ -72,7 +72,7 @@ import org.apache.hadoop.util.ToolRunner
* on the distributed file-system.
* </li>
* <li>
- * Submitting the job to the <code>JobTracker</code> and optionally monitoring
+ * Submitting the job to the cluster and optionally monitoring
* it's status.
* </li>
* </ol></p>
@@ -152,7 +152,7 @@ public class JobClient extends CLI {
/**
* We store a JobProfile and a timestamp for when we last
* acquired the job profile. If the job is null, then we cannot
- * perform any of the tasks. The job might be null if the JobTracker
+ * perform any of the tasks. The job might be null if the cluster
* has completely forgotten about the job. (eg, 24 hours after the
* job completes.)
*/
@@ -348,7 +348,7 @@ public class JobClient extends CLI {
}
/**
- * Fetch task completion events from jobtracker for this job.
+ * Fetch task completion events from cluster for this job.
*/
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
int startFrom) throws IOException {
@@ -429,7 +429,7 @@ public class JobClient extends CLI {
/**
* Build a job client with the given {@link JobConf}, and connect to the
- * default {@link JobTracker}.
+ * default cluster
*
* @param conf the job configuration.
* @throws IOException
@@ -440,7 +440,7 @@ public class JobClient extends CLI {
/**
* Build a job client with the given {@link Configuration},
- * and connect to the default {@link JobTracker}.
+ * and connect to the default cluster
*
* @param conf the configuration.
* @throws IOException
@@ -450,7 +450,7 @@ public class JobClient extends CLI {
}
/**
- * Connect to the default {@link JobTracker}.
+ * Connect to the default cluster
* @param conf the job configuration.
* @throws IOException
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Thu Sep 8 01:39:07 2011
@@ -476,7 +476,6 @@ public class JobConf extends Configurati
/**
* Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
- * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
*/
@Deprecated
public void deleteLocalFiles() throws IOException {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Thu Sep 8 01:39:07 2011
@@ -1736,6 +1736,7 @@ class MapTask extends Task {
indexCacheList.get(0).writeToFile(
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
}
+ sortPhase.complete();
return;
}
@@ -1776,6 +1777,7 @@ class MapTask extends Task {
} finally {
finalOut.close();
}
+ sortPhase.complete();
return;
}
{
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TIPStatus.java Thu Sep 8 01:39:07 2011
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-/** The states of a {@link TaskInProgress} as seen by the JobTracker.
+/** The states of a Tasks.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Thu Sep 8 01:39:07 2011
@@ -1124,8 +1124,14 @@ public class Job extends JobContextImpl
IntegerRanges reduceRanges = getProfileTaskRange(false);
int progMonitorPollIntervalMillis =
Job.getProgressPollInterval(clientConf);
- while (!isComplete()) {
- Thread.sleep(progMonitorPollIntervalMillis);
+ /* make sure to report full progress after the job is done */
+ boolean reportedAfterCompletion = false;
+ while (!isComplete() || !reportedAfterCompletion) {
+ if (isComplete()) {
+ reportedAfterCompletion = true;
+ } else {
+ Thread.sleep(progMonitorPollIntervalMillis);
+ }
String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " +
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java Thu Sep 8 01:39:07 2011
@@ -43,8 +43,6 @@ import org.apache.hadoop.io.Text;
*
* @see TaskID
* @see TaskAttemptID
- * @see org.apache.hadoop.mapred.JobTracker#getNewJobId()
- * @see org.apache.hadoop.mapred.JobTracker#getStartTime()
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Thu Sep 8 01:39:07 2011
@@ -22,8 +22,7 @@ import org.apache.hadoop.classification.
/**
* Place holder for cluster level configuration keys.
*
- * These keys are used by both {@link JobTracker} and {@link TaskTracker}. The
- * keys should have "mapreduce.cluster." as the prefix.
+ * The keys should have "mapreduce.cluster." as the prefix.
*
*/
@InterfaceAudience.Private
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java Thu Sep 8 01:39:07 2011
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -47,6 +49,7 @@ import org.apache.hadoop.util.StringUtil
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ControlledJob {
+ private static final Log LOG = LogFactory.getLog(ControlledJob.class);
// A job will be in one of the following states
public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
@@ -235,6 +238,17 @@ public class ControlledJob {
job.killJob();
}
+ public synchronized void failJob(String message) throws IOException, InterruptedException {
+ try {
+ if(job != null && this.state == State.RUNNING) {
+ job.killJob();
+ }
+ } finally {
+ this.state = State.FAILED;
+ this.message = message;
+ }
+ }
+
/**
* Check the state of this running job. The state may
* remain the same, become SUCCESS or FAILED.
@@ -322,6 +336,7 @@ public class ControlledJob {
job.submit();
this.state = State.RUNNING;
} catch (Exception ioe) {
+ LOG.info(getJobName()+" got an error while submitting ",ioe);
this.state = State.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java Thu Sep 8 01:39:07 2011
@@ -21,13 +21,16 @@ package org.apache.hadoop.mapreduce.lib.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
+import org.apache.hadoop.util.StringUtils;
/**
* This class encapsulates a set of MapReduce jobs and its dependency.
@@ -49,17 +52,16 @@ import org.apache.hadoop.mapreduce.lib.j
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class JobControl implements Runnable {
+ private static final Log LOG = LogFactory.getLog(JobControl.class);
// The thread can be in one of the following state
public static enum ThreadState {RUNNING, SUSPENDED,STOPPED, STOPPING, READY};
private ThreadState runnerState; // the thread state
- private Map<String, ControlledJob> waitingJobs;
- private Map<String, ControlledJob> readyJobs;
- private Map<String, ControlledJob> runningJobs;
- private Map<String, ControlledJob> successfulJobs;
- private Map<String, ControlledJob> failedJobs;
+ private LinkedList<ControlledJob> jobsInProgress = new LinkedList<ControlledJob>();
+ private LinkedList<ControlledJob> successfulJobs = new LinkedList<ControlledJob>();
+ private LinkedList<ControlledJob> failedJobs = new LinkedList<ControlledJob>();
private long nextJobID;
private String groupName;
@@ -69,46 +71,51 @@ public class JobControl implements Runna
* @param groupName a name identifying this group
*/
public JobControl(String groupName) {
- this.waitingJobs = new Hashtable<String, ControlledJob>();
- this.readyJobs = new Hashtable<String, ControlledJob>();
- this.runningJobs = new Hashtable<String, ControlledJob>();
- this.successfulJobs = new Hashtable<String, ControlledJob>();
- this.failedJobs = new Hashtable<String, ControlledJob>();
this.nextJobID = -1;
this.groupName = groupName;
this.runnerState = ThreadState.READY;
}
private static List<ControlledJob> toList(
- Map<String, ControlledJob> jobs) {
+ LinkedList<ControlledJob> jobs) {
ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
synchronized (jobs) {
- for (ControlledJob job : jobs.values()) {
+ for (ControlledJob job : jobs) {
retv.add(job);
}
}
return retv;
}
+ synchronized private List<ControlledJob> getJobsIn(State state) {
+ LinkedList<ControlledJob> l = new LinkedList<ControlledJob>();
+ for(ControlledJob j: jobsInProgress) {
+ if(j.getJobState() == state) {
+ l.add(j);
+ }
+ }
+ return l;
+ }
+
/**
* @return the jobs in the waiting state
*/
public List<ControlledJob> getWaitingJobList() {
- return toList(this.waitingJobs);
+ return getJobsIn(State.WAITING);
}
/**
* @return the jobs in the running state
*/
public List<ControlledJob> getRunningJobList() {
- return toList(this.runningJobs);
+ return getJobsIn(State.RUNNING);
}
/**
* @return the jobs in the ready state
*/
public List<ControlledJob> getReadyJobsList() {
- return toList(this.readyJobs);
+ return getJobsIn(State.READY);
}
/**
@@ -126,34 +133,6 @@ public class JobControl implements Runna
nextJobID += 1;
return this.groupName + this.nextJobID;
}
-
- private static void addToQueue(ControlledJob aJob,
- Map<String, ControlledJob> queue) {
- synchronized(queue) {
- queue.put(aJob.getJobID(), aJob);
- }
- }
-
- private void addToQueue(ControlledJob aJob) {
- Map<String, ControlledJob> queue = getQueue(aJob.getJobState());
- addToQueue(aJob, queue);
- }
-
- private Map<String, ControlledJob> getQueue(State state) {
- Map<String, ControlledJob> retv = null;
- if (state == State.WAITING) {
- retv = this.waitingJobs;
- } else if (state == State.READY) {
- retv = this.readyJobs;
- } else if (state == State.RUNNING) {
- retv = this.runningJobs;
- } else if (state == State.SUCCESS) {
- retv = this.successfulJobs;
- } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
- retv = this.failedJobs;
- }
- return retv;
- }
/**
* Add a new job.
@@ -163,7 +142,7 @@ public class JobControl implements Runna
String id = this.getNextJobID();
aJob.setJobID(id);
aJob.setJobState(State.WAITING);
- this.addToQueue(aJob);
+ jobsInProgress.add(aJob);
return id;
}
@@ -211,47 +190,8 @@ public class JobControl implements Runna
}
}
- synchronized private void checkRunningJobs()
- throws IOException, InterruptedException {
-
- Map<String, ControlledJob> oldJobs = null;
- oldJobs = this.runningJobs;
- this.runningJobs = new Hashtable<String, ControlledJob>();
-
- for (ControlledJob nextJob : oldJobs.values()) {
- nextJob.checkState();
- this.addToQueue(nextJob);
- }
- }
-
- synchronized private void checkWaitingJobs()
- throws IOException, InterruptedException {
- Map<String, ControlledJob> oldJobs = null;
- oldJobs = this.waitingJobs;
- this.waitingJobs = new Hashtable<String, ControlledJob>();
-
- for (ControlledJob nextJob : oldJobs.values()) {
- nextJob.checkState();
- this.addToQueue(nextJob);
- }
- }
-
- synchronized private void startReadyJobs() {
- Map<String, ControlledJob> oldJobs = null;
- oldJobs = this.readyJobs;
- this.readyJobs = new Hashtable<String, ControlledJob>();
-
- for (ControlledJob nextJob : oldJobs.values()) {
- //Submitting Job to Hadoop
- nextJob.submit();
- this.addToQueue(nextJob);
- }
- }
-
synchronized public boolean allFinished() {
- return this.waitingJobs.size() == 0 &&
- this.readyJobs.size() == 0 &&
- this.runningJobs.size() == 0;
+ return jobsInProgress.isEmpty();
}
/**
@@ -262,39 +202,83 @@ public class JobControl implements Runna
* Submit the jobs in ready state
*/
public void run() {
- this.runnerState = ThreadState.RUNNING;
- while (true) {
- while (this.runnerState == ThreadState.SUSPENDED) {
+ try {
+ this.runnerState = ThreadState.RUNNING;
+ while (true) {
+ while (this.runnerState == ThreadState.SUSPENDED) {
+ try {
+ Thread.sleep(5000);
+ }
+ catch (Exception e) {
+ //TODO the thread was interrupted, do something!!!
+ }
+ }
+
+ synchronized(this) {
+ Iterator<ControlledJob> it = jobsInProgress.iterator();
+ while(it.hasNext()) {
+ ControlledJob j = it.next();
+ LOG.debug("Checking state of job "+j);
+ switch(j.checkState()) {
+ case SUCCESS:
+ successfulJobs.add(j);
+ it.remove();
+ break;
+ case FAILED:
+ case DEPENDENT_FAILED:
+ failedJobs.add(j);
+ it.remove();
+ break;
+ case READY:
+ j.submit();
+ break;
+ case RUNNING:
+ case WAITING:
+ //Do Nothing
+ break;
+ }
+ }
+ }
+
+ if (this.runnerState != ThreadState.RUNNING &&
+ this.runnerState != ThreadState.SUSPENDED) {
+ break;
+ }
try {
Thread.sleep(5000);
}
catch (Exception e) {
-
+ //TODO the thread was interrupted, do something!!!
+ }
+ if (this.runnerState != ThreadState.RUNNING &&
+ this.runnerState != ThreadState.SUSPENDED) {
+ break;
}
}
- try {
- checkRunningJobs();
- checkWaitingJobs();
- startReadyJobs();
- } catch (Exception e) {
- this.runnerState = ThreadState.STOPPED;
- }
- if (this.runnerState != ThreadState.RUNNING &&
- this.runnerState != ThreadState.SUSPENDED) {
- break;
- }
- try {
- Thread.sleep(5000);
- }
- catch (Exception e) {
-
- }
- if (this.runnerState != ThreadState.RUNNING &&
- this.runnerState != ThreadState.SUSPENDED) {
- break;
- }
+ }catch(Throwable t) {
+ LOG.error("Error while trying to run jobs.",t);
+ //Mark all jobs as failed because we got something bad.
+ failAllJobs(t);
}
this.runnerState = ThreadState.STOPPED;
}
+ synchronized private void failAllJobs(Throwable t) {
+ String message = "Unexpected System Error Occured: "+
+ StringUtils.stringifyException(t);
+ Iterator<ControlledJob> it = jobsInProgress.iterator();
+ while(it.hasNext()) {
+ ControlledJob j = it.next();
+ try {
+ j.failJob(message);
+ } catch (IOException e) {
+ LOG.error("Error while tyring to clean up "+j.getJobName(), e);
+ } catch (InterruptedException e) {
+ LOG.error("Error while tyring to clean up "+j.getJobName(), e);
+ } finally {
+ failedJobs.add(j);
+ it.remove();
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Thu Sep 8 01:39:07 2011
@@ -90,7 +90,9 @@ public class CompletedJob implements org
report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
report.setStartTime(jobInfo.getLaunchTime());
report.setFinishTime(jobInfo.getFinishTime());
- //TOODO Possibly populate job progress. Never used.
+ report.setJobName(jobInfo.getJobname());
+ report.setUser(jobInfo.getUsername());
+ //TODO Possibly populate job progress. Never used.
//report.setMapProgress(progress)
//report.setReduceProgress(progress)
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Thu Sep 8 01:39:07 2011
@@ -146,4 +146,10 @@ public class CompletedTaskAttempt implem
public long getFinishTime() {
return report.getFinishTime();
}
+
+ @Override
+ public int getShufflePort() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Sep 8 01:39:07 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.PrivilegedAction;
+import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -50,7 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
@@ -61,24 +62,20 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
-import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
- private static final NotRunningJob NOTSTARTEDJOB =
- new NotRunningJob(JobState.NEW);
-
- private static final NotRunningJob FAILEDJOB =
- new NotRunningJob(JobState.FAILED);
-
- private static final NotRunningJob KILLEDJOB =
- new NotRunningJob(JobState.KILLED);
+
+ // Caches for per-user NotRunningJobs
+ private static HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs =
+ new HashMap<JobState, HashMap<String, NotRunningJob>>();
private final Configuration conf;
private final JobID jobId;
@@ -101,6 +98,24 @@ class ClientServiceDelegate {
this.appId = TypeConverter.toYarn(jobId).getAppId();
}
+ // Get the instance of the NotRunningJob corresponding to the specified
+ // user and state
+ private NotRunningJob getNotRunningJob(String user, JobState state) {
+ synchronized (notRunningJobs) {
+ HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
+ if (map == null) {
+ map = new HashMap<String, NotRunningJob>();
+ notRunningJobs.put(state, map);
+ }
+ NotRunningJob notRunningJob = map.get(user);
+ if (notRunningJob == null) {
+ notRunningJob = new NotRunningJob(user, state);
+ map.put(user, notRunningJob);
+ }
+ return notRunningJob;
+ }
+ }
+
private MRClientProtocol getProxy() throws YarnRemoteException {
if (!forceRefresh && realProxy != null) {
return realProxy;
@@ -149,26 +164,30 @@ class ClientServiceDelegate {
}
}
- /** we just want to return if its allocating, so that we dont
+ /** we just want to return if its allocating, so that we don't
* block on it. This is to be able to return job status
- * on a allocating Application.
+ * on an allocating Application.
*/
+ String user = application.getUser();
+ if (user == null) {
+ throw new YarnRemoteExceptionPBImpl("User is not set in the application report");
+ }
if (application.getState() == ApplicationState.NEW ||
application.getState() == ApplicationState.SUBMITTED) {
realProxy = null;
- return NOTSTARTEDJOB;
+ return getNotRunningJob(user, JobState.NEW);
}
if (application.getState() == ApplicationState.FAILED) {
realProxy = null;
- return FAILEDJOB;
+ return getNotRunningJob(user, JobState.FAILED);
}
if (application.getState() == ApplicationState.KILLED) {
- realProxy = null;
- return KILLEDJOB;
- }
+ realProxy = null;
+ return getNotRunningJob(user, JobState.KILLED);
+ }
//History server can serve a job only if application
//succeeded.
@@ -270,17 +289,15 @@ class ClientServiceDelegate {
return result;
}
- JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
- YarnRemoteException {
+ JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
- String stagingDir = conf.get("yarn.apps.stagingDir");
- String jobFile = stagingDir + "/" + jobId.toString();
- MRClientProtocol protocol;
GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
request.setJobId(jobId);
- JobReport report = ((GetJobReportResponse) invoke("getJobReport",
+ JobReport report = ((GetJobReportResponse) invoke("getJobReport",
GetJobReportRequest.class, request)).getJobReport();
+ String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
+
//TODO: add tracking url in JobReport
return TypeConverter.fromYarn(report, jobFile, "");
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Thu Sep 8 01:39:07 2011
@@ -63,8 +63,10 @@ public class NotRunningJob implements MR
RecordFactoryProvider.getRecordFactory(null);
private final JobState jobState;
+ private final String user;
- NotRunningJob(JobState jobState) {
+ NotRunningJob(String username, JobState jobState) {
+ this.user = username;
this.jobState = jobState;
}
@@ -104,7 +106,10 @@ public class NotRunningJob implements MR
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
- jobReport.setJobState(jobState);
+ jobReport.setJobState(this.jobState);
+
+ jobReport.setUser(this.user);
+ // TODO: Add jobName & other job information that is available
resp.setJobReport(jobReport);
return resp;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Sep 8 01:39:07 2011
@@ -120,7 +120,7 @@ public class ResourceMgrDelegate {
recordFactory.newRecordInstance(GetAllApplicationsRequest.class);
GetAllApplicationsResponse response =
applicationsManager.getAllApplications(request);
- return TypeConverter.fromYarnApps(response.getApplicationList());
+ return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf);
}
@@ -182,7 +182,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(queueName, true, false, false);
recordFactory.newRecordInstance(GetQueueInfoRequest.class);
return TypeConverter.fromYarn(
- applicationsManager.getQueueInfo(request).getQueueInfo());
+ applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf);
}
private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent,
@@ -216,7 +216,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
getChildQueues(rootQueue, queues);
- return TypeConverter.fromYarnQueueInfo(queues);
+ return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
@@ -229,7 +229,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(ROOT, false, true, false)).getQueueInfo();
getChildQueues(rootQueue, queues);
- return TypeConverter.fromYarnQueueInfo(queues);
+ return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
public QueueInfo[] getChildQueues(String parent) throws IOException,
@@ -242,7 +242,7 @@ public class ResourceMgrDelegate {
getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
getChildQueues(parentQueue, queues);
- return TypeConverter.fromYarnQueueInfo(queues);
+ return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
public String getStagingAreaDir() throws IOException, InterruptedException {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider Thu Sep 8 01:39:07 2011
@@ -12,17 +12,3 @@
# limitations under the License.
#
org.apache.hadoop.mapred.YarnClientProtocolProvider
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-org.apache.hadoop.mapred.YarnClientProtocolProvider
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Sep 8 01:39:07 2011
@@ -268,6 +268,7 @@ public class TestClientRedirect {
String[] split = AMHOSTADDRESS.split(":");
application.setHost(split[0]);
application.setRpcPort(Integer.parseInt(split[1]));
+ application.setUser("TestClientRedirect-user");
GetApplicationReportResponse response = recordFactory
.newRecordInstance(GetApplicationReportResponse.class);
response.setApplicationReport(application);
@@ -397,6 +398,11 @@ public class TestClientRedirect {
JobReport jobReport = recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
jobReport.setJobState(JobState.RUNNING);
+ jobReport.setJobName("TestClientRedirect-jobname");
+ jobReport.setUser("TestClientRedirect-user");
+ jobReport.setStartTime(0L);
+ jobReport.setFinishTime(1L);
+
GetJobReportResponse response = recordFactory
.newRecordInstance(GetJobReportResponse.class);
response.setJobReport(jobReport);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Sep 8 01:39:07 2011
@@ -72,6 +72,8 @@ public class MiniMRYarnCluster extends M
conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT,
ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
Service.class);
+ // Non-standard shuffle port
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083);
conf.setClass(NMConfig.NM_CONTAINER_EXECUTOR_CLASS,
DefaultContainerExecutor.class, ContainerExecutor.class);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Sep 8 01:39:07 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
@@ -105,7 +106,8 @@ public class TestMRJobs {
if (mrCluster == null) {
mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName());
- mrCluster.init(new Configuration());
+ Configuration conf = new Configuration();
+ mrCluster.init(conf);
mrCluster.start();
}
@@ -150,7 +152,7 @@ public class TestMRJobs {
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
verifySleepJobCounters(job);
-
+ verifyTaskProgress(job);
// TODO later: add explicit "isUber()" checks of some sort (extend
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
@@ -172,6 +174,18 @@ public class TestMRJobs {
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
}
+
+ protected void verifyTaskProgress(Job job) throws InterruptedException,
+ IOException {
+ for (TaskReport taskReport : job.getTaskReports(TaskType.MAP)) {
+ Assert.assertTrue(0.9999f < taskReport.getProgress()
+ && 1.0001f > taskReport.getProgress());
+ }
+ for (TaskReport taskReport : job.getTaskReports(TaskType.REDUCE)) {
+ Assert.assertTrue(0.9999f < taskReport.getProgress()
+ && 1.0001f > taskReport.getProgress());
+ }
+ }
@Test
public void testRandomWriter() throws IOException, InterruptedException,
@@ -197,6 +211,7 @@ public class TestMRJobs {
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+
// Make sure there are three files in the output-dir
RemoteIterator<FileStatus> iterator =
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Sep 8 01:39:07 2011
@@ -120,7 +120,8 @@ public class ShuffleHandler extends Abst
private static final JobTokenSecretManager secretManager =
new JobTokenSecretManager();
- public static final String SHUFFLE_PORT = "mapreduce.shuffle.port";
+ public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
+ public static final int DEFAULT_SHUFFLE_PORT = 8080;
@Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener {
@@ -155,15 +156,59 @@ public class ShuffleHandler extends Abst
this(DefaultMetricsSystem.instance());
}
+ /**
+ * Serialize the shuffle port into a ByteBuffer for use later on.
+ * @param port the port to be sent to the ApplciationMaster
+ * @return the serialized form of the port.
+ */
+ static ByteBuffer serializeMetaData(int port) throws IOException {
+ //TODO these bytes should be versioned
+ DataOutputBuffer port_dob = new DataOutputBuffer();
+ port_dob.writeInt(port);
+ return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+ }
+
+ /**
+ * A helper function to deserialize the metadata returned by ShuffleHandler.
+ * @param meta the metadata returned by the ShuffleHandler
+ * @return the port the Shuffle Handler is listening on to serve shuffle data.
+ */
+ public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+ //TODO this should be returning a class not just an int
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(meta);
+ int port = in.readInt();
+ return port;
+ }
+
+ /**
+ * A helper function to serialize the JobTokenIdentifier to be sent to the
+ * ShuffleHandler as ServiceData.
+ * @param jobToken the job token to be used for authentication of
+ * shuffle data requests.
+ * @return the serialized version of the jobToken.
+ */
+ public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
+ //TODO these bytes should be versioned
+ DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+ jobToken.write(jobToken_dob);
+ return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+ }
+
+ static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(secret);
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+ jt.readFields(in);
+ return jt;
+ }
+
@Override
public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
// TODO these bytes should be versioned
try {
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(secret);
- Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
- jt.readFields(in);
- // TODO: Once SHuffle is out of NM, this can use MR APIs
+ Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
+ // TODO: Once SHuffle is out of NM, this can use MR APIs
JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
userRsrc.put(jobId.toString(), user);
LOG.info("Added token for " + jobId.toString());
@@ -193,7 +238,7 @@ public class ShuffleHandler extends Abst
Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector);
bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
- port = conf.getInt("mapreduce.shuffle.port", 8080);
+ port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
accepted.add(bootstrap.bind(new InetSocketAddress(port)));
LOG.info(getName() + " listening on port " + port);
super.start();
@@ -207,6 +252,17 @@ public class ShuffleHandler extends Abst
super.stop();
}
+ @Override
+ public synchronized ByteBuffer getMeta() {
+ try {
+ return serializeMetaData(port);
+ } catch (IOException e) {
+ LOG.error("Error during getMeta", e);
+ // TODO add API to AuxiliaryServices to report failures
+ return null;
+ }
+ }
+
Shuffle createShuffle() {
return new Shuffle(getConfig());
}
@@ -306,7 +362,7 @@ public class ShuffleHandler extends Abst
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
try {
verifyRequest(jobId, ctx, request, response,
- new URL("http", "", 8080, reqUri));
+ new URL("http", "", port, reqUri));
} catch (IOException e) {
LOG.warn("Shuffle failure ", e);
sendError(ctx, e.getMessage(), UNAUTHORIZED);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Thu Sep 8 01:39:07 2011
@@ -26,11 +26,21 @@ import static org.apache.hadoop.test.Met
import org.jboss.netty.channel.ChannelFuture;
import org.junit.Test;
+import static org.junit.Assert.*;
import static org.apache.hadoop.test.MockitoMaker.*;
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
+ @Test public void testSerializeMeta() throws Exception {
+ assertEquals(1, ShuffleHandler.deserializeMetaData(
+ ShuffleHandler.serializeMetaData(1)));
+ assertEquals(-1, ShuffleHandler.deserializeMetaData(
+ ShuffleHandler.serializeMetaData(-1)));
+ assertEquals(8080, ShuffleHandler.deserializeMetaData(
+ ShuffleHandler.serializeMetaData(8080)));
+ }
+
@Test public void testShuffleMetrics() throws Exception {
MetricsSystem ms = new MetricsSystemImpl();
ShuffleHandler sh = new ShuffleHandler(ms);
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/bin/yarn Thu Sep 8 01:39:07 2011
@@ -148,132 +148,18 @@ IFS=
# add hadoop-common libs to CLASSPATH
-if [ -d "$HADOOP_COMMON_HOME/build/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build/classes
-fi
-if [ -d "$HADOOP_COMMON_HOME/build/webapps" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build
-fi
-if [ -d "$HADOOP_COMMON_HOME/build/test/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build/test/classes
-fi
-if [ -d "$HADOOP_COMMON_HOME/build/test/core/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/build/test/core/classes
-fi
-
-for f in $HADOOP_COMMON_HOME/hadoop-*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-for f in $HADOOP_COMMON_HOME/lib/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-for f in $HADOOP_COMMON_HOME/share/hadoop/common/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-for f in $HADOOP_COMMON_HOME/share/hadoop/common/lib/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-for f in $HADOOP_COMMON_HOME/share/hadoop/hdfs/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-if [ -d "$HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Common/common" ]; then
-for f in $HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Common/common/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-fi
-
-if [ -d "$HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Hdfs/common" ]; then
-for f in $HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Hdfs/common/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-fi
-
-if [ -d "$HADOOP_COMMON_HOME/build/ivy/lib/Hadoop/common" ]; then
-for f in $HADOOP_COMMON_HOME/build/ivy/lib/Hadoop/common/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-fi
+CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/share/hadoop/common'/*'
+CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/share/hadoop/common/lib'/*'
# add hadoop-hdfs libs to CLASSPATH
-for f in $HADOOP_HDFS_HOME/hadoop-*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-for f in $HADOOP_HDFS_HOME/lib/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-if [ -d "$HADOOP_HDFS_HOME/build/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build/classes
-fi
-if [ -d "$HADOOP_HDFS_HOME/build/webapps" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build
-fi
-if [ -d "$HADOOP_HDFS_HOME/build/test/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build/test/classes
-fi
-if [ -d "$HADOOP_HDFS_HOME/build/tools" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/build/tools
-fi
-
-# add hadoop-mapred libs to CLASSPATH
-
-for f in $HADOOP_HDFS_HOME/hadoop-*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-for f in $HADOOP_HDFS_HOME/lib/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-if [ -d "$HADOOP_MAPRED_HOME/build/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/classes
-fi
-if [ -d "$HADOOP_MAPRED_HOME/build/webapps" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build
-fi
-if [ -d "$HADOOP_MAPRED_HOME/build/test/classes" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/test/classes
-fi
-if [ -d "$HADOOP_MAPRED_HOME/build/tools" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/build/tools
-fi
-
-# for releases, add core mapred jar & webapps to CLASSPATH
-if [ -d "$HADOOP_MAPRED_HOME/webapps" ]; then
- CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME
-fi
-
-# add libs to CLASSPATH
-for f in $HADOOP_MAPRED_HOME/lib/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-# add libs to CLASSPATH
-for f in $HADOOP_MAPRED_HOME/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-# add libs to CLASSPATH
-for f in $YARN_HOME/lib/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
+CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs'/*'
+CLASSPATH=${CLASSPATH}:$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib'/*'
# add yarn libs to CLASSPATH
-for f in $YARN_HOME/modules/*.jar; do
- CLASSPATH=${CLASSPATH}:$f;
-done
-
-# add user-specified CLASSPATH last
-if [ "$YARN_USER_CLASSPATH_FIRST" = "" ] && [ "$YARN_CLASSPATH" != "" ]; then
- CLASSPATH=${CLASSPATH}:${YARN_CLASSPATH}
-fi
+
+CLASSPATH=${CLASSPATH}:$YARN_HOME/modules'/*'
+CLASSPATH=${CLASSPATH}:$YARN_HOME/lib'/*'
# default log directory & file
if [ "$YARN_LOG_DIR" = "" ]; then
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/AMRMProtocol.java Thu Sep 8 01:39:07 2011
@@ -18,16 +18,94 @@
package org.apache.hadoop.yarn.api;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+/**
+ * <p>The protocol between a live instance of <code>ApplicationMaster</code>
+ * and the <code>ResourceManager</code>.</p>
+ *
+ * <p>This is used by the <code>ApplicationMaster</code> to register/unregister
+ * and to request and obtain resources in the cluster from the
+ * <code>ResourceManager</code>.</p>
+ */
+@Public
+@Stable
public interface AMRMProtocol {
- public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException;
- public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException;;
- public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by a new <code>ApplicationMaster</code> to register
+ * with the <code>ResourceManager</code>.</p>
+ *
+ * <p>The <code>ApplicationMaster</code> needs to provide details such
+ * as RPC Port, HTTP tracking url etc. as specified in
+ * {@link RegisterApplicationMasterRequest}.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with critical details such
+ * as minimum and maximum resource capabilities in the cluster as specified in
+ * {@link RegisterApplicationMasterResponse}.</p>
+ *
+ * @param request registration request
+ * @return registration respose
+ * @throws YarnRemoteException
+ */
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by an <code>ApplicationMaster</code> to notify the
+ * <code>ResourceManager</code> about its completion (success or failed).</p>
+ *
+ * <p>The <code>ApplicationMaster</code> has to provide details such as
+ * final state, diagnostics (in case of failures) etc. as specified in
+ * {@link FinishApplicationMasterRequest}.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with
+ * {@link FinishApplicationMasterResponse}.</p>
+ *
+ * @param request completion request
+ * @return completion response
+ * @throws YarnRemoteException
+ */
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The main interface between an <code>ApplicationMaster</code>
+ * and the <code>ResourceManager</code>.</p>
+ *
+ * <p>The <code>ApplicationMaster</code> uses this interface to provide a list
+ * of {@link ResourceRequest} and returns unused {@link Container} allocated
+ * to it via {@link AllocateRequest}.</p>
+ *
+ * <p>This also doubles up as a <em>heartbeat</em> to let the
+ * <code>ResourceManager</code> know that the <code>ApplicationMaster</code>
+ * is alive. Thus, applications should use periodically make this call to
+ * be kept alive.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with list of allocated
+ * {@link Container}, status of completed containers and headroom information
+ * for the application.</p>
+ *
+ * <p>The <code>ApplicationMaster</code> can use the available headroom
+ * (resources) to decide how to utilized allocated resources and make
+ * informed decisions about future resource requests.</p>
+ *
+ * @param request allocation request
+ * @return allocation response
+ * @throws YarnRemoteException
+ */
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnRemoteException;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ClientRMProtocol.java Thu Sep 8 01:39:07 2011
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.api;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
@@ -36,16 +39,190 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+/**
+ * <p>The protocol between clients and the <code>ResourceManager</code>
+ * to submit/abort jobs and to get information on applications, cluster metrics,
+ * nodes, queues and ACLs.</p>
+ */
+@Public
+@Stable
public interface ClientRMProtocol {
- public GetNewApplicationIdResponse getNewApplicationId(GetNewApplicationIdRequest request) throws YarnRemoteException;
- public GetApplicationReportResponse getApplicationReport(GetApplicationReportRequest request) throws YarnRemoteException;
- public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request) throws YarnRemoteException;
- public FinishApplicationResponse finishApplication(FinishApplicationRequest request) throws YarnRemoteException;
- public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request) throws YarnRemoteException;
- public GetAllApplicationsResponse getAllApplications(GetAllApplicationsRequest request) throws YarnRemoteException;
- public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnRemoteException;
- public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) throws YarnRemoteException;
- public GetQueueUserAclsInfoResponse getQueueUserAcls(GetQueueUserAclsInfoRequest request) throws YarnRemoteException;
+ /**
+ * <p>The interface used by clients to obtain a new {@link ApplicationId} for
+ * submitting new applications.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with a new, monotonically
+ * increasing, {@link ApplicationId} which is used by the client to submit
+ * a new application.</p>
+ *
+ * @param request request to get a new <code>ApplicationId</code>
+ * @return new <code>ApplicationId</code> to be used to submit an application
+ * @throws YarnRemoteException
+ * @see #submitApplication(SubmitApplicationRequest)
+ */
+ public GetNewApplicationIdResponse getNewApplicationId(
+ GetNewApplicationIdRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to submit a new application to the
+ * <code>ResourceManager.</code></p>
+ *
+ * <p>The client is required to provide details such as queue,
+ * {@link Resource} required to run the <code>ApplicationMaster</code>,
+ * the equivalent of {@link ContainerLaunchContext} for launching
+ * the <code>ApplicationMaster</code> etc. via the
+ * {@link SubmitApplicationRequest}.</p>
+ *
+ * <p>Currently the <code>ResourceManager</code> sends an immediate (empty)
+ * {@link SubmitApplicationResponse} on accepting the submission and throws
+ * an exception if it rejects the submission.</p>
+ *
+ * <p> In secure mode,the <code>ResourceManager</code> verifies access to
+ * queues etc. before accepting the application submission.</p>
+ *
+ * @param request request to submit a new application
+ * @return (empty) response on accepting the submission
+ * @throws YarnRemoteException
+ * @see #getNewApplicationId(GetNewApplicationIdRequest)
+ */
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to request the
+ * <code>ResourceManager</code> to abort submitted application.</p>
+ *
+ * <p>The client, via {@link FinishApplicationRequest} provides the
+ * {@link ApplicationId} of the application to be aborted.</p>
+ *
+ * <p> In secure mode,the <code>ResourceManager</code> verifies access to the
+ * application, queue etc. before terminating the application.</p>
+ *
+ * <p>Currently, the <code>ResourceManager</code> returns an empty response
+ * on success and throws an exception on rejecting the request.</p>
+ *
+ * @param request request to abort a submited application
+ * @return <code>ResourceManager</code> returns an empty response
+ * on success and throws an exception on rejecting the request
+ * @throws YarnRemoteException
+ * @see #getQueueUserAcls(GetQueueUserAclsInfoRequest)
+ */
+ public FinishApplicationResponse finishApplication(
+ FinishApplicationRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to get a report of an Application from
+ * the <code>ResourceManager</code>.</p>
+ *
+ * <p>The client, via {@link GetApplicationReportRequest} provides the
+ * {@link ApplicationId} of the application.</p>
+ *
+ * <p> In secure mode,the <code>ResourceManager</code> verifies access to the
+ * application, queue etc. before accepting the request.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with a
+ * {@link GetApplicationReportResponse} which includes the
+ * {@link ApplicationReport} for the application.</p>
+ *
+ * @param request request for an application report
+ * @return application report
+ * @throws YarnRemoteException
+ */
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to get metrics about the cluster from
+ * the <code>ResourceManager</code>.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with a
+ * {@link GetClusterMetricsResponse} which includes the
+ * {@link YarnClusterMetrics} with details such as number of current
+ * nodes in the cluster.</p>
+ *
+ * @param request request for cluster metrics
+ * @return cluster metrics
+ * @throws YarnRemoteException
+ */
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to get a report of all Applications
+ * in the cluster from the <code>ResourceManager</code>.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with a
+ * {@link GetAllApplicationsResponse} which includes the
+ * {@link ApplicationReport} for all the applications.</p>
+ *
+ * @param request request for report on all running applications
+ * @return report on all running applications
+ * @throws YarnRemoteException
+ */
+ public GetAllApplicationsResponse getAllApplications(
+ GetAllApplicationsRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to get a report of all nodes
+ * in the cluster from the <code>ResourceManager</code>.</p>
+ *
+ * <p>The <code>ResourceManager</code> responds with a
+ * {@link GetClusterNodesResponse} which includes the
+ * {@link NodeReport} for all the nodes in the cluster.</p>
+ *
+ * @param request request for report on all nodes
+ * @return report on all nodes
+ * @throws YarnRemoteException
+ */
+ public GetClusterNodesResponse getClusterNodes(
+ GetClusterNodesRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to get information about <em>queues</em>
+ * from the <code>ResourceManager</code>.</p>
+ *
+ * <p>The client, via {@link GetQueueInfoRequest}, can ask for details such
+ * as used/total resources, child queues, running applications etc.</p>
+ *
+ * <p> In secure mode,the <code>ResourceManager</code> verifies access before
+ * providing the information.</p>
+ *
+ * @param request request to get queue information
+ * @return queue information
+ * @throws YarnRemoteException
+ */
+ public GetQueueInfoResponse getQueueInfo(
+ GetQueueInfoRequest request)
+ throws YarnRemoteException;
+
+ /**
+ * <p>The interface used by clients to get information about <em>queue
+ * acls</em> for <em>current users</em> from the <code>ResourceManager</code>.
+ * </p>
+ *
+ * <p>The <code>ResourceManager</code> responds with queue acls for all
+ * existing queues.</p>
+ *
+ * @param request request to get queue acls for <em>current user</em>
+ * @return queue acls for <em>current user</em>
+ * @throws YarnRemoteException
+ */
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request)
+ throws YarnRemoteException;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java?rev=1166495&r1=1166494&r2=1166495&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ContainerManager.java Thu Sep 8 01:39:07 2011
@@ -18,21 +18,108 @@
package org.apache.hadoop.yarn.api;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+/**
+ * <p>The protocol between an <code>ApplicationMaster</code> and a
+ * <code>NodeManager</code> to start/stop containers and to get status
+ * of running containers.</p>
+ *
+ * <p>If security is enabled the <code>NodeManager</code> verifies that the
+ * <code>ApplicationMaster</code> has truly been allocated the container
+ * by the <code>ResourceManager</code> and also verifies all interactions such
+ * as stopping the container or obtaining status information for the container.
+ * </p>
+ */
+@Public
+@Stable
public interface ContainerManager {
+ /**
+ * <p>The <code>ApplicationMaster</code> requests a <code>NodeManager</code>
+ * to <em>start</em> a {@link Container} allocated to it using this interface.
+ * </p>
+ *
+ * <p>The <code>ApplicationMaster</code> has to provide details such as
+ * allocated resource capability, security tokens (if enabled), command
+ * to be executed to start the container, environment for the process,
+ * necessary binaries/jar/shared-objects etc. via the
+ * {@link ContainerLaunchContext} in the {@link StartContainerRequest}.</p>
+ *
+ * <p>Currently the <code>NodeManager</code> sends an immediate, empty
+ * response via {@link StartContainerResponse} to signify acceptance of the
+ * request and throws an exception in case of errors. The
+ * <code>ApplicationMaster</code> can use
+ * {@link #getContainerStatus(GetContainerStatusRequest)} to get updated
+ * status of the to-be-launched or launched container.</p>
+ *
+ * @param request request to start a container
+ * @return empty response to indicate acceptance of the request
+ * or an exception
+ * @throws YarnRemoteException
+ */
+ @Public
+ @Stable
StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException;
+ /**
+ * <p>The <code>ApplicationMaster</code> requests a <code>NodeManager</code>
+ * to <em>stop</em> a {@link Container} allocated to it using this interface.
+ * </p>
+ *
+ * <p>The <code>ApplicationMaster</code></p> sends a
+ * {@link StopContainerRequest} which includes the {@link ContainerId} of the
+ * container to be stopped.</p>
+ *
+ * <p>Currently the <code>NodeManager</code> sends an immediate, empty
+ * response via {@link StopContainerResponse} to signify acceptance of the
+ * request and throws an exception in case of errors. The
+ * <code>ApplicationMaster</code> can use
+ * {@link #getContainerStatus(GetContainerStatusRequest)} to get updated
+ * status of the container.</p>
+ *
+ * @param request request to stop a container
+ * @return empty response to indicate acceptance of the request
+ * or an exception
+ * @throws YarnRemoteException
+ */
+ @Public
+ @Stable
StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException;
+ /**
+ * <p>The api used by the <code>ApplicationMaster</code> to request for
+ * current status of a <code>Container</code> from the
+ * <code>NodeManager</code>.</p>
+ *
+ * <p>The <code>ApplicationMaster</code></p> sends a
+ * {@link GetContainerStatusRequest} which includes the {@link ContainerId} of
+ * the container whose status is needed.</p>
+ *
+ *<p>The <code>NodeManager</code> responds with
+ *{@link GetContainerStatusResponse} which includes the
+ *{@link ContainerStatus} of the container.</p>
+ *
+ * @param request request to get <code>ContainerStatus</code> of a container
+ * with the specified <code>ContainerId</code>
+ * @return <code>ContainerStatus</code> of the container
+ * @throws YarnRemoteException
+ */
+ @Public
+ @Stable
GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException;
}