You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [28/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/StartEndTimesBase.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,213 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+
+abstract class StartEndTimesBase implements TaskRuntimeEstimator {
+ static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
+ = 0.05F;
+ static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+ = 1;
+
+ protected Configuration conf = null;
+ protected AppContext context = null;
+
+ protected final Map<TaskAttemptId, Long> startTimes
+ = new ConcurrentHashMap<TaskAttemptId, Long>();
+
+ // XXXX This class design assumes that the contents of AppContext.getAllJobs
+ // never changes. Is that right?
+ //
+ // This assumption comes in in several places, mostly in data structure that
+ // can grow without limit if a AppContext gets new Job's when the old ones
+ // run out. Also, these mapper statistics blocks won't cover the Job's
+ // we don't know about.
+ protected final Map<Job, DataStatistics> mapperStatistics
+ = new HashMap<Job, DataStatistics>();
+ protected final Map<Job, DataStatistics> reducerStatistics
+ = new HashMap<Job, DataStatistics>();
+
+
+ private final Map<Job, Float> slowTaskRelativeTresholds
+ = new HashMap<Job, Float>();
+
+ protected final Set<Task> doneTasks = new HashSet<Task>();
+
+ @Override
+ public void enrollAttempt(TaskAttemptStatus status, long timestamp) {
+ startTimes.put(status.id,timestamp);
+ }
+
+ @Override
+ public long attemptEnrolledTime(TaskAttemptId attemptID) {
+ Long result = startTimes.get(attemptID);
+
+ return result == null ? Long.MAX_VALUE : result;
+ }
+
+
+ @Override
+ public void contextualize(Configuration conf, AppContext context) {
+ this.conf = conf;
+ this.context = context;
+
+ Map<JobId, Job> allJobs = context.getAllJobs();
+
+ for (Map.Entry<JobId, Job> entry : allJobs.entrySet()) {
+ final Job job = entry.getValue();
+ mapperStatistics.put(job, new DataStatistics());
+ reducerStatistics.put(job, new DataStatistics());
+ slowTaskRelativeTresholds.put
+ (job, conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
+ }
+ }
+
+ protected DataStatistics dataStatisticsForTask(TaskId taskID) {
+ JobId jobID = taskID.getJobId();
+ Job job = context.getJob(jobID);
+
+ if (job == null) {
+ return null;
+ }
+
+ Task task = job.getTask(taskID);
+
+ if (task == null) {
+ return null;
+ }
+
+ return task.getType() == TaskType.MAP
+ ? mapperStatistics.get(job)
+ : task.getType() == TaskType.REDUCE
+ ? reducerStatistics.get(job)
+ : null;
+ }
+
+ @Override
+ public long thresholdRuntime(TaskId taskID) {
+ JobId jobID = taskID.getJobId();
+ Job job = context.getJob(jobID);
+
+ TaskType type = taskID.getTaskType();
+
+ DataStatistics statistics
+ = dataStatisticsForTask(taskID);
+
+ int completedTasksOfType
+ = type == TaskType.MAP
+ ? job.getCompletedMaps() : job.getCompletedReduces();
+
+ int totalTasksOfType
+ = type == TaskType.MAP
+ ? job.getTotalMaps() : job.getTotalReduces();
+
+ if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
+ || (((float)completedTasksOfType) / totalTasksOfType)
+ < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
+ return Long.MAX_VALUE;
+ }
+
+ long result = statistics == null
+ ? Long.MAX_VALUE
+ : (long)statistics.outlier(slowTaskRelativeTresholds.get(job));
+ return result;
+ }
+
+ @Override
+ public long estimatedNewAttemptRuntime(TaskId id) {
+ DataStatistics statistics = dataStatisticsForTask(id);
+
+ if (statistics == null) {
+ return -1L;
+ }
+
+ return (long)statistics.mean();
+ }
+
+ @Override
+ public void updateAttempt(TaskAttemptStatus status, long timestamp) {
+
+ TaskAttemptId attemptID = status.id;
+ TaskId taskID = attemptID.getTaskId();
+ JobId jobID = taskID.getJobId();
+ Job job = context.getJob(jobID);
+
+ if (job == null) {
+ return;
+ }
+
+ Task task = job.getTask(taskID);
+
+ if (task == null) {
+ return;
+ }
+
+ Long boxedStart = startTimes.get(attemptID);
+ long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
+
+ TaskAttempt taskAttempt = task.getAttempt(attemptID);
+
+ if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
+ boolean isNew = false;
+ // is this a new success?
+ synchronized (doneTasks) {
+ if (!doneTasks.contains(task)) {
+ doneTasks.add(task);
+ isNew = true;
+ }
+ }
+
+ // It's a new completion
+ // Note that if a task completes twice [because of a previous speculation
+ // and a race, or a success followed by loss of the machine with the
+ // local data] we only count the first one.
+ if (isNew) {
+ long finish = timestamp;
+ if (start > 1L && finish > 1L && start <= finish) {
+ long duration = finish - start;
+
+ DataStatistics statistics
+ = dataStatisticsForTask(taskID);
+
+ if (statistics != null) {
+ statistics.add(duration);
+ }
+ }
+ }
+ }
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskRuntimeEstimator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,90 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+
+
+
+public interface TaskRuntimeEstimator {
+ public void enrollAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+
+ public long attemptEnrolledTime(TaskAttemptId attemptID);
+
+ public void updateAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+
+ public void contextualize(Configuration conf, AppContext context);
+
+ /**
+ *
+ * Find a maximum reasonable execution wallclock time. Includes the time
+ * already elapsed.
+ *
+ * Find a maximum reasonable execution time. Includes the time
+ * already elapsed. If the projected total execution time for this task
+ * ever exceeds its reasonable execution time, we may speculate it.
+ *
+ * @param id the {@link TaskId} of the task we are asking about
+ * @return the task's maximum reasonable runtime, or MAX_VALUE if
+ * we don't have enough information to rule out any runtime,
+ * however long.
+ *
+ */
+ public long thresholdRuntime(TaskId id);
+
+ /**
+ *
+ * Estimate a task attempt's total runtime. Includes the time already
+ * elapsed.
+ *
+ * @param id the {@link TaskAttemptId} of the attempt we are asking about
+ * @return our best estimate of the attempt's runtime, or {@code -1} if
+ * we don't have enough information yet to produce an estimate.
+ *
+ */
+ public long estimatedRuntime(TaskAttemptId id);
+
+ /**
+ *
+ * Estimates how long a new attempt on this task will take if we start
+ * one now
+ *
+ * @param id the {@link TaskId} of the task we are asking about
+ * @return our best estimate of a new attempt's runtime, or {@code -1} if
+ * we don't have enough information yet to produce an estimate.
+ *
+ */
+ public long estimatedNewAttemptRuntime(TaskId id);
+
+ /**
+ *
+ * Computes the width of the error band of our estimate of the task
+ * runtime as returned by {@link #estimatedRuntime(TaskAttemptId)}
+ *
+ * @param id the {@link TaskAttemptId} of the attempt we are asking about
+ * @return our best estimate of the attempt's runtime, or {@code -1} if
+ * we don't have enough information yet to produce an estimate.
+ *
+ */
+ public long runtimeEstimateVariance(TaskAttemptId id);
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/TaskSpeculationPredicate.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+
+
+public class TaskSpeculationPredicate {
+ boolean canSpeculate(AppContext context, TaskId taskID) {
+ // This class rejects speculating any task that already has speculations,
+ // or isn't running.
+ // Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
+ // can be even more restrictive.
+ JobId jobID = taskID.getJobId();
+ Job job = context.getJob(jobID);
+ Task task = job.getTask(taskID);
+ return task.getAttempts().size() == 1;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/speculate/package-info.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.speculate;
+import org.apache.hadoop.classification.InterfaceAudience;
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskAttemptCleanupEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * This class encapsulates task cleanup event.
+ *
+ */
+public class TaskAttemptCleanupEvent extends
+ AbstractEvent<TaskCleaner.EventType> {
+
+ private final TaskAttemptId attemptID;
+ private final OutputCommitter committer;
+ private final TaskAttemptContext attemptContext;
+ private final ContainerId containerId;
+
+ public TaskAttemptCleanupEvent(TaskAttemptId attemptID,
+ ContainerId containerId, OutputCommitter committer,
+ TaskAttemptContext attemptContext) {
+ super(TaskCleaner.EventType.TASK_CLEAN);
+ this.attemptID = attemptID;
+ this.containerId = containerId;
+ this.committer = committer;
+ this.attemptContext = attemptContext;
+ }
+
+ public TaskAttemptId getAttemptID() {
+ return attemptID;
+ }
+
+ public OutputCommitter getCommitter() {
+ return committer;
+ }
+
+ public TaskAttemptContext getAttemptContext() {
+ return attemptContext;
+ }
+
+ /**
+ * containerId could be null if the container task attempt had not started.
+ * @return
+ */
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleaner.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface TaskCleaner extends EventHandler<TaskCleanupEvent> {
+
+ enum EventType {
+ // TODO XXX Rename this event once the code is more stable.
+ TASK_CLEAN,
+ CONTAINER_COMPLETED,
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerContainerCompletedEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class TaskCleanerContainerCompletedEvent extends AbstractEvent<TaskCleaner.EventType> {
+
+ private ContainerId containerId;
+
+ public TaskCleanerContainerCompletedEvent(ContainerId containerId) {
+ super(TaskCleaner.EventType.CONTAINER_COMPLETED);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanerImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
+
+ private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
+
+ private final AppContext context;
+ private ThreadPoolExecutor launcherPool;
+ private Thread eventHandlingThread;
+ private BlockingQueue<TaskCleanupEvent> eventQueue =
+ new LinkedBlockingQueue<TaskCleanupEvent>();
+
+ public TaskCleanerImpl(AppContext context) {
+ super("TaskCleaner");
+ this.context = context;
+ }
+
+ public void start() {
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("TaskCleaner #%d")
+ .build();
+ launcherPool = new ThreadPoolExecutor(5, 5, 1,
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+ eventHandlingThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ TaskCleanupEvent event = null;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ return;
+ }
+ // the events from the queue are handled in parallel
+ // using a thread pool
+ launcherPool.execute(new EventProcessor(event)); }
+ }
+ });
+ eventHandlingThread.setName("TaskCleaner Event Handler");
+ eventHandlingThread.start();
+ super.start();
+ }
+
+ public void stop() {
+ eventHandlingThread.interrupt();
+ launcherPool.shutdown();
+ Iterator<TaskCleanupEvent> it = eventQueue.iterator();
+ while(it.hasNext()) {
+ TaskCleanupEvent ev = it.next();
+ LOG.info("TaskCleaner.stop: Cleanup for: " + ev.getAttemptID());
+ new EventProcessor(ev).run();
+ }
+ super.stop();
+ }
+
+ private class EventProcessor implements Runnable {
+ private TaskCleanupEvent event;
+
+ EventProcessor(TaskCleanupEvent event) {
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Processing the event " + event.toString());
+ try {
+ event.getCommitter().abortTask(event.getAttemptContext());
+ } catch (Exception e) {
+ LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
+ }
+ }
+ }
+
+ @Override
+ public void handle(TaskCleanupEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanupEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanupEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanupEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/TaskCleanupEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * This class encapsulates task cleanup event.
+ *
+ */
+public class TaskCleanupEvent extends AbstractEvent<TaskCleaner.EventType> {
+ // TODO XXX: Rename to TaskAttemptCleanupEvent ?
+
+ // TODO XXX: Maybe include the containerId along with this event. Otherwise depend on events coming in from the Container to include TaskAttemptIds.
+ private final TaskAttemptId attemptID;
+ private final OutputCommitter committer;
+ private final TaskAttemptContext attemptContext;
+
+ public TaskCleanupEvent(TaskAttemptId attemptID, OutputCommitter committer,
+ TaskAttemptContext attemptContext) {
+ super(TaskCleaner.EventType.TASK_CLEAN);
+ this.attemptID = attemptID;
+ this.committer = committer;
+ this.attemptContext = attemptContext;
+ }
+
+ public TaskAttemptId getAttemptID() {
+ return attemptID;
+ }
+
+ public OutputCommitter getCommitter() {
+ return committer;
+ }
+
+ public TaskAttemptContext getAttemptContext() {
+ return attemptContext;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/package-info.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/package-info.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/taskclean/package-info.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.taskclean;
+import org.apache.hadoop.classification.InterfaceAudience;
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMParams.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMParams.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMParams.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMParams.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+/**
+ * Params constants for the AM webapp and the history webapp.
+ */
+public interface AMParams {
+ static final String RM_WEB = "rm.web";
+ static final String APP_ID = "app.id";
+ static final String JOB_ID = "job.id";
+ static final String TASK_ID = "task.id";
+ static final String TASK_TYPE = "task.type";
+ static final String ATTEMPT_STATE = "attempt.state";
+ static final String COUNTER_GROUP = "counter.group";
+ static final String COUNTER_NAME = "counter.name";
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebApp.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebApp.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebApp.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebApp.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,51 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
+
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.WebApp;
+
+/**
+ * Application master webapp
+ */
+public class AMWebApp extends WebApp implements AMParams {
+
+ @Override
+ public void setup() {
+ bind(JAXBContextResolver.class);
+ bind(GenericExceptionHandler.class);
+ bind(AMWebServices.class);
+ route("/", AppController.class);
+ route("/app", AppController.class);
+ route(pajoin("/job", JOB_ID), AppController.class, "job");
+ route(pajoin("/conf", JOB_ID), AppController.class, "conf");
+ route(pajoin("/jobcounters", JOB_ID), AppController.class, "jobCounters");
+ route(pajoin("/singlejobcounter",JOB_ID, COUNTER_GROUP, COUNTER_NAME),
+ AppController.class, "singleJobCounter");
+ route(pajoin("/tasks", JOB_ID, TASK_TYPE), AppController.class, "tasks");
+ route(pajoin("/attempts", JOB_ID, TASK_TYPE, ATTEMPT_STATE),
+ AppController.class, "attempts");
+ route(pajoin("/task", TASK_ID), AppController.class, "task");
+ route(pajoin("/taskcounters", TASK_ID), AppController.class, "taskCounters");
+ route(pajoin("/singletaskcounter",TASK_ID, COUNTER_GROUP, COUNTER_NAME),
+ AppController.class, "singleTaskCounter");
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebServices.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebServices.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebServices.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AMWebServices.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.AppInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.AMAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.AMAttemptsInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.ConfInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.JobCounterInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.JobInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.JobTaskAttemptCounterInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.JobTaskCounterInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.JobsInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.ReduceTaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.TaskAttemptsInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.TasksInfo;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Inject;
+
+@Path("/ws/v1/mapreduce")
+public class AMWebServices {
+ private final AppContext appCtx;
+ private final App app;
+
+ private @Context HttpServletResponse response;
+
+ @Inject
+ public AMWebServices(final App app, final AppContext context) {
+ this.appCtx = context;
+ this.app = app;
+ }
+
+ Boolean hasAccess(Job job, HttpServletRequest request) {
+ String remoteUser = request.getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ if (callerUGI != null && !job.checkAccess(callerUGI, JobACL.VIEW_JOB)) {
+ return false;
+ }
+ return true;
+ }
+
+ private void init() {
+ //clear content type
+ response.setContentType(null);
+ }
+
+ /**
+ * convert a job id string to an actual job and handle all the error checking.
+ */
+ public static Job getJobFromJobIdString(String jid, AppContext appCtx) throws NotFoundException {
+ JobId jobId;
+ Job job;
+ try {
+ jobId = MRApps.toJobID(jid);
+ } catch (YarnException e) {
+ // TODO: after MAPREDUCE-2793 YarnException is probably not expected here
+ // anymore but keeping it for now just in case other stuff starts failing.
+ // Also, the webservice should ideally return BadRequest (HTTP:400) when
+ // the id is malformed instead of NotFound (HTTP:404). The webserver on
+ // top of which AMWebServices is built seems to automatically do that for
+ // unhandled exceptions
+ throw new NotFoundException(e.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new NotFoundException(e.getMessage());
+ }
+ if (jobId == null) {
+ throw new NotFoundException("job, " + jid + ", is not found");
+ }
+ job = appCtx.getJob(jobId);
+ if (job == null) {
+ throw new NotFoundException("job, " + jid + ", is not found");
+ }
+ return job;
+ }
+
+ /**
+ * convert a task id string to an actual task and handle all the error
+ * checking.
+ */
+ public static Task getTaskFromTaskIdString(String tid, Job job) throws NotFoundException {
+ TaskId taskID;
+ Task task;
+ try {
+ taskID = MRApps.toTaskID(tid);
+ } catch (YarnException e) {
+ // TODO: after MAPREDUCE-2793 YarnException is probably not expected here
+ // anymore but keeping it for now just in case other stuff starts failing.
+ // Also, the webservice should ideally return BadRequest (HTTP:400) when
+ // the id is malformed instead of NotFound (HTTP:404). The webserver on
+ // top of which AMWebServices is built seems to automatically do that for
+ // unhandled exceptions
+ throw new NotFoundException(e.getMessage());
+ } catch (NumberFormatException ne) {
+ throw new NotFoundException(ne.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new NotFoundException(e.getMessage());
+ }
+ if (taskID == null) {
+ throw new NotFoundException("taskid " + tid + " not found or invalid");
+ }
+ task = job.getTask(taskID);
+ if (task == null) {
+ throw new NotFoundException("task not found with id " + tid);
+ }
+ return task;
+ }
+
+ /**
+ * convert a task attempt id string to an actual task attempt and handle all
+ * the error checking.
+ */
+ public static TaskAttempt getTaskAttemptFromTaskAttemptString(String attId, Task task)
+ throws NotFoundException {
+ TaskAttemptId attemptId;
+ TaskAttempt ta;
+ try {
+ attemptId = MRApps.toTaskAttemptID(attId);
+ } catch (YarnException e) {
+ // TODO: after MAPREDUCE-2793 YarnException is probably not expected here
+ // anymore but keeping it for now just in case other stuff starts failing.
+ // Also, the webservice should ideally return BadRequest (HTTP:400) when
+ // the id is malformed instead of NotFound (HTTP:404). The webserver on
+ // top of which AMWebServices is built seems to automatically do that for
+ // unhandled exceptions
+ throw new NotFoundException(e.getMessage());
+ } catch (NumberFormatException ne) {
+ throw new NotFoundException(ne.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new NotFoundException(e.getMessage());
+ }
+ if (attemptId == null) {
+ throw new NotFoundException("task attempt id " + attId
+ + " not found or invalid");
+ }
+ ta = task.getAttempt(attemptId);
+ if (ta == null) {
+ throw new NotFoundException("Error getting info on task attempt id "
+ + attId);
+ }
+ return ta;
+ }
+
+
+ /**
+ * check for job access.
+ *
+ * @param job
+ * the job that is being accessed
+ */
+ void checkAccess(Job job, HttpServletRequest request) {
+ if (!hasAccess(job, request)) {
+ throw new WebApplicationException(Status.UNAUTHORIZED);
+ }
+ }
+
+ @GET
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public AppInfo get() {
+ return getAppInfo();
+ }
+
+ @GET
+ @Path("/info")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public AppInfo getAppInfo() {
+ init();
+ return new AppInfo(this.app, this.app.context);
+ }
+
+ @GET
+ @Path("/jobs")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public JobsInfo getJobs(@Context HttpServletRequest hsr) {
+ init();
+ JobsInfo allJobs = new JobsInfo();
+ for (Job job : appCtx.getAllJobs().values()) {
+ // getAllJobs only gives you a partial we want a full
+ Job fullJob = appCtx.getJob(job.getID());
+ if (fullJob == null) {
+ continue;
+ }
+ allJobs.add(new JobInfo(fullJob, hasAccess(fullJob, hsr)));
+ }
+ return allJobs;
+ }
+
+ @GET
+ @Path("/jobs/{jobid}")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public JobInfo getJob(@Context HttpServletRequest hsr,
+ @PathParam("jobid") String jid) {
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ return new JobInfo(job, hasAccess(job, hsr));
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/jobattempts")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ AMAttemptsInfo amAttempts = new AMAttemptsInfo();
+ for (AMInfo amInfo : job.getAMInfos()) {
+ AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(
+ job.getID()), job.getUserName());
+ amAttempts.add(attempt);
+ }
+ return amAttempts;
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/counters")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public JobCounterInfo getJobCounters(@Context HttpServletRequest hsr,
+ @PathParam("jobid") String jid) {
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ return new JobCounterInfo(this.appCtx, job);
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/conf")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public ConfInfo getJobConf(@Context HttpServletRequest hsr,
+ @PathParam("jobid") String jid) {
+
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ ConfInfo info;
+ try {
+ info = new ConfInfo(job);
+ } catch (IOException e) {
+ throw new NotFoundException("unable to load configuration for job: "
+ + jid);
+ }
+ return info;
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/tasks")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public TasksInfo getJobTasks(@Context HttpServletRequest hsr,
+ @PathParam("jobid") String jid, @QueryParam("type") String type) {
+
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ TasksInfo allTasks = new TasksInfo();
+ for (Task task : job.getTasks().values()) {
+ TaskType ttype = null;
+ if (type != null && !type.isEmpty()) {
+ try {
+ ttype = MRApps.taskType(type);
+ } catch (YarnException e) {
+ throw new BadRequestException("tasktype must be either m or r");
+ }
+ }
+ if (ttype != null && task.getType() != ttype) {
+ continue;
+ }
+ allTasks.add(new TaskInfo(task));
+ }
+ return allTasks;
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/tasks/{taskid}")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public TaskInfo getJobTask(@Context HttpServletRequest hsr,
+ @PathParam("jobid") String jid, @PathParam("taskid") String tid) {
+
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ Task task = getTaskFromTaskIdString(tid, job);
+ return new TaskInfo(task);
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/tasks/{taskid}/counters")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public JobTaskCounterInfo getSingleTaskCounters(
+ @Context HttpServletRequest hsr, @PathParam("jobid") String jid,
+ @PathParam("taskid") String tid) {
+
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ Task task = getTaskFromTaskIdString(tid, job);
+ return new JobTaskCounterInfo(task);
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/tasks/{taskid}/attempts")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
+ @PathParam("jobid") String jid, @PathParam("taskid") String tid) {
+
+ init();
+ TaskAttemptsInfo attempts = new TaskAttemptsInfo();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ Task task = getTaskFromTaskIdString(tid, job);
+
+ for (TaskAttempt ta : task.getAttempts().values()) {
+ if (ta != null) {
+ if (task.getType() == TaskType.REDUCE) {
+ attempts.add(new ReduceTaskAttemptInfo(ta, task.getType()));
+ } else {
+ attempts.add(new TaskAttemptInfo(ta, task.getType(), true));
+ }
+ }
+ }
+ return attempts;
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public TaskAttemptInfo getJobTaskAttemptId(@Context HttpServletRequest hsr,
+ @PathParam("jobid") String jid, @PathParam("taskid") String tid,
+ @PathParam("attemptid") String attId) {
+
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ Task task = getTaskFromTaskIdString(tid, job);
+ TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
+ if (task.getType() == TaskType.REDUCE) {
+ return new ReduceTaskAttemptInfo(ta, task.getType());
+ } else {
+ return new TaskAttemptInfo(ta, task.getType(), true);
+ }
+ }
+
+ @GET
+ @Path("/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/counters")
+ @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+ public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters(
+ @Context HttpServletRequest hsr, @PathParam("jobid") String jid,
+ @PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
+
+ init();
+ Job job = getJobFromJobIdString(jid, appCtx);
+ checkAccess(job, hsr);
+ Task task = getTaskFromTaskIdString(tid, job);
+ TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
+ return new JobTaskAttemptCounterInfo(ta);
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/App.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/App.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/App.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/App.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,54 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.servlet.RequestScoped;
+
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+
+@RequestScoped
+public class App {
+ final AppContext context;
+ private Job job;
+ private Task task;
+
+ @Inject
+ App(AppContext ctx) {
+ context = ctx;
+ }
+
+ void setJob(Job job) {
+ this.job = job;
+ }
+
+ public Job getJob() {
+ return job;
+ }
+
+ void setTask(Task task) {
+ this.task = task;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppController.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppController.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppController.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppController.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,423 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import static org.apache.hadoop.yarn.util.StringHelper.join;
+
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.Locale;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.AppInfo;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.StringHelper;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.hadoop.yarn.webapp.View;
+
+import com.google.inject.Inject;
+
+/**
+ * This class renders the various pages that the web app supports.
+ */
+public class AppController extends Controller implements AMParams {
+ private static final Log LOG = LogFactory.getLog(AppController.class);
+
+ protected final App app;
+
+ protected AppController(App app, Configuration conf, RequestContext ctx,
+ String title) {
+ super(ctx);
+ this.app = app;
+ set(APP_ID, app.context.getApplicationID().toString());
+ set(RM_WEB, YarnConfiguration.getRMWebAppURL(conf));
+ }
+
+ @Inject
+ protected AppController(App app, Configuration conf, RequestContext ctx) {
+ this(app, conf, ctx, "am");
+ }
+
+ /**
+ * Render the default(index.html) page for the Application Controller
+ */
+ @Override public void index() {
+ setTitle(join("MapReduce Application ", $(APP_ID)));
+ }
+
+ /**
+ * Render the /info page with an overview of current application.
+ */
+ public void info() {
+ AppInfo info = new AppInfo(app, app.context);
+ info("Application Master Overview").
+ _("Application ID:", info.getId()).
+ _("Application Name:", info.getName()).
+ _("User:", info.getUser()).
+ _("Started on:", Times.format(info.getStartTime())).
+ _("Elasped: ", org.apache.hadoop.util.StringUtils.formatTime(
+ info.getElapsedTime() ));
+ render(InfoPage.class);
+ }
+
+ /**
+ * @return The class that will render the /job page
+ */
+ protected Class<? extends View> jobPage() {
+ return JobPage.class;
+ }
+
+ /**
+ * Render the /job page
+ */
+ public void job() {
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ render(jobPage());
+ }
+
+ /**
+ * @return the class that will render the /jobcounters page
+ */
+ protected Class<? extends View> countersPage() {
+ return CountersPage.class;
+ }
+
+ /**
+ * Render the /jobcounters page
+ */
+ public void jobCounters() {
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ if (app.getJob() != null) {
+ setTitle(join("Counters for ", $(JOB_ID)));
+ }
+ render(countersPage());
+ }
+
+ /**
+ * Display a page showing a task's counters
+ */
+ public void taskCounters() {
+ try {
+ requireTask();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ if (app.getTask() != null) {
+ setTitle(StringHelper.join("Counters for ", $(TASK_ID)));
+ }
+ render(countersPage());
+ }
+
+ /**
+ * @return the class that will render the /singlejobcounter page
+ */
+ protected Class<? extends View> singleCounterPage() {
+ return SingleCounterPage.class;
+ }
+
+ /**
+ * Render the /singlejobcounter page
+ * @throws IOException on any error.
+ */
+ public void singleJobCounter() throws IOException{
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
+ set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
+ if (app.getJob() != null) {
+ setTitle(StringHelper.join($(COUNTER_GROUP)," ",$(COUNTER_NAME),
+ " for ", $(JOB_ID)));
+ }
+ render(singleCounterPage());
+ }
+
+ /**
+ * Render the /singletaskcounter page
+ * @throws IOException on any error.
+ */
+ public void singleTaskCounter() throws IOException{
+ try {
+ requireTask();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
+ set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
+ if (app.getTask() != null) {
+ setTitle(StringHelper.join($(COUNTER_GROUP)," ",$(COUNTER_NAME),
+ " for ", $(TASK_ID)));
+ }
+ render(singleCounterPage());
+ }
+
+ /**
+ * @return the class that will render the /tasks page
+ */
+ protected Class<? extends View> tasksPage() {
+ return TasksPage.class;
+ }
+
+ /**
+ * Render the /tasks page
+ */
+ public void tasks() {
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ if (app.getJob() != null) {
+ try {
+ String tt = $(TASK_TYPE);
+ tt = tt.isEmpty() ? "All" : StringUtils.capitalize(MRApps.taskType(tt).
+ toString().toLowerCase(Locale.US));
+ setTitle(join(tt, " Tasks for ", $(JOB_ID)));
+ } catch (Exception e) {
+ LOG.error("Failed to render tasks page with task type : "
+ + $(TASK_TYPE) + " for job id : " + $(JOB_ID), e);
+ badRequest(e.getMessage());
+ }
+ }
+ render(tasksPage());
+ }
+
+ /**
+ * @return the class that will render the /task page
+ */
+ protected Class<? extends View> taskPage() {
+ return TaskPage.class;
+ }
+
+ /**
+ * Render the /task page
+ */
+ public void task() {
+ try {
+ requireTask();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ if (app.getTask() != null) {
+ setTitle(join("Attempts for ", $(TASK_ID)));
+ }
+ render(taskPage());
+ }
+
+ /**
+ * @return the class that will render the /attempts page
+ */
+ protected Class<? extends View> attemptsPage() {
+ return AttemptsPage.class;
+ }
+
+ /**
+ * Render the attempts page
+ */
+ public void attempts() {
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ if (app.getJob() != null) {
+ try {
+ String taskType = $(TASK_TYPE);
+ if (taskType.isEmpty()) {
+ throw new RuntimeException("missing task-type.");
+ }
+ String attemptState = $(ATTEMPT_STATE);
+ if (attemptState.isEmpty()) {
+ throw new RuntimeException("missing attempt-state.");
+ }
+ setTitle(join(attemptState, " ",
+ MRApps.taskType(taskType).toString(), " attempts in ", $(JOB_ID)));
+
+ render(attemptsPage());
+ } catch (Exception e) {
+ LOG.error("Failed to render attempts page with task type : "
+ + $(TASK_TYPE) + " for job id : " + $(JOB_ID), e);
+ badRequest(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * @return the page that will be used to render the /conf page
+ */
+ protected Class<? extends View> confPage() {
+ return JobConfPage.class;
+ }
+
+ /**
+ * Render the /conf page
+ */
+ public void conf() {
+ requireJob();
+ try {
+ requireJob();
+ }
+ catch (Exception e) {
+ renderText(e.getMessage());
+ return;
+ }
+ render(confPage());
+ }
+
+ /**
+ * Render a BAD_REQUEST error.
+ * @param s the error message to include.
+ */
+ void badRequest(String s) {
+ setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ String title = "Bad request: ";
+ setTitle((s != null) ? join(title, s) : title);
+ }
+
+ /**
+ * Render a NOT_FOUND error.
+ * @param s the error message to include.
+ */
+ void notFound(String s) {
+ setStatus(HttpServletResponse.SC_NOT_FOUND);
+ setTitle(join("Not found: ", s));
+ }
+
+ /**
+ * Render a ACCESS_DENIED error.
+ * @param s the error message to include.
+ */
+ void accessDenied(String s) {
+ setStatus(HttpServletResponse.SC_FORBIDDEN);
+ setTitle(join("Access denied: ", s));
+ }
+
+ /**
+ * check for job access.
+ * @param job the job that is being accessed
+ * @return True if the requesting user has permission to view the job
+ */
+ boolean checkAccess(Job job) {
+ String remoteUser = request().getRemoteUser();
+ UserGroupInformation callerUGI = null;
+ if (remoteUser != null) {
+ callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ if (callerUGI != null && !job.checkAccess(callerUGI, JobACL.VIEW_JOB)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Ensure that a JOB_ID was passed into the page.
+ */
+ public void requireJob() {
+ if ($(JOB_ID).isEmpty()) {
+ badRequest("missing job ID");
+ throw new RuntimeException("Bad Request: Missing job ID");
+ }
+
+ JobId jobID = MRApps.toJobID($(JOB_ID));
+ app.setJob(app.context.getJob(jobID));
+ if (app.getJob() == null) {
+ notFound($(JOB_ID));
+ throw new RuntimeException("Not Found: " + $(JOB_ID));
+ }
+
+ /* check for acl access */
+ Job job = app.context.getJob(jobID);
+ if (!checkAccess(job)) {
+ accessDenied("User " + request().getRemoteUser() + " does not have " +
+ " permission to view job " + $(JOB_ID));
+ throw new RuntimeException("Access denied: User " +
+ request().getRemoteUser() + " does not have permission to view job " +
+ $(JOB_ID));
+ }
+ }
+
+ /**
+ * Ensure that a TASK_ID was passed into the page.
+ */
+ public void requireTask() {
+ if ($(TASK_ID).isEmpty()) {
+ badRequest("missing task ID");
+ throw new RuntimeException("missing task ID");
+ }
+
+ TaskId taskID = MRApps.toTaskID($(TASK_ID));
+ Job job = app.context.getJob(taskID.getJobId());
+ app.setJob(job);
+ if (app.getJob() == null) {
+ notFound(MRApps.toString(taskID.getJobId()));
+ throw new RuntimeException("Not Found: " + $(JOB_ID));
+ } else {
+ app.setTask(app.getJob().getTask(taskID));
+ if (app.getTask() == null) {
+ notFound($(TASK_ID));
+ throw new RuntimeException("Not Found: " + $(TASK_ID));
+ }
+ }
+ if (!checkAccess(job)) {
+ accessDenied("User " + request().getRemoteUser() + " does not have " +
+ " permission to view job " + $(JOB_ID));
+ throw new RuntimeException("Access denied: User " +
+ request().getRemoteUser() + " does not have permission to view job " +
+ $(JOB_ID));
+ }
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppView.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppView.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppView.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AppView.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,59 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.TwoColumnLayout;
+
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+public class AppView extends TwoColumnLayout {
+
+ @Override protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+ set(DATATABLES_ID, "jobs");
+ set(initID(DATATABLES, "jobs"), jobsTableInit());
+ setTableStyles(html, "jobs");
+ }
+
+ protected void commonPreHead(Page.HTML<_> html) {
+ set(ACCORDION_ID, "nav");
+ set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
+ }
+
+ @Override
+ protected Class<? extends SubView> nav() {
+ return NavBlock.class;
+ }
+
+ @Override
+ protected Class<? extends SubView> content() {
+ return JobsBlock.class;
+ }
+
+ private String jobsTableInit() {
+ return tableInit().
+ // Sort by id upon page load
+ append(", aaSorting: [[0, 'asc']]").
+ append(",aoColumns:[{sType:'title-numeric'},").
+ append("null,null,{sType:'title-numeric', bSearchable:false},null,").
+ append("null,{sType:'title-numeric',bSearchable:false}, null, null]}").
+ toString();
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AttemptsPage.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AttemptsPage.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AttemptsPage.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/AttemptsPage.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,76 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import static org.apache.hadoop.mapreduce.v2.app2.webapp.AMParams.TASK_TYPE;
+import static org.apache.hadoop.mapreduce.v2.app2.webapp.AMParams.ATTEMPT_STATE;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
+import org.apache.hadoop.yarn.webapp.SubView;
+
+import com.google.inject.Inject;
+
+public class AttemptsPage extends TaskPage {
+ static class FewAttemptsBlock extends TaskPage.AttemptsBlock {
+ @Inject
+ FewAttemptsBlock(App ctx) {
+ super(ctx);
+ }
+
+ @Override
+ protected boolean isValidRequest() {
+ return true;
+ }
+
+ @Override
+ protected Collection<TaskAttempt> getTaskAttempts() {
+ List<TaskAttempt> fewTaskAttemps = new ArrayList<TaskAttempt>();
+ String taskTypeStr = $(TASK_TYPE);
+ TaskType taskType = MRApps.taskType(taskTypeStr);
+ String attemptStateStr = $(ATTEMPT_STATE);
+ TaskAttemptStateUI neededState = MRApps
+ .taskAttemptState(attemptStateStr);
+ for (Task task : super.app.getJob().getTasks(taskType).values()) {
+ Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
+ for (TaskAttempt attempt : attempts.values()) {
+ if (neededState.correspondsTo(attempt.getState())) {
+ fewTaskAttemps.add(attempt);
+ }
+ }
+ }
+ return fewTaskAttemps;
+ }
+ }
+
+ @Override
+ protected Class<? extends SubView> content() {
+ return FewAttemptsBlock.class;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/ConfBlock.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/ConfBlock.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/ConfBlock.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/ConfBlock.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,120 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import static org.apache.hadoop.mapreduce.v2.app2.webapp.AMParams.JOB_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.ConfEntryInfo;
+import org.apache.hadoop.mapreduce.v2.app2.webapp.dao.ConfInfo;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
+import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+/**
+ * Render the configuration for this job.
+ */
+public class ConfBlock extends HtmlBlock {
+ final AppContext appContext;
+
+ @Inject ConfBlock(AppContext appctx) {
+ appContext = appctx;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hadoop.yarn.webapp.view.HtmlBlock#render(org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block)
+ */
+ @Override protected void render(Block html) {
+ String jid = $(JOB_ID);
+ if (jid.isEmpty()) {
+ html.
+ p()._("Sorry, can't do anything without a JobID.")._();
+ return;
+ }
+ JobId jobID = MRApps.toJobID(jid);
+ Job job = appContext.getJob(jobID);
+ if (job == null) {
+ html.
+ p()._("Sorry, ", jid, " not found.")._();
+ return;
+ }
+ Path confPath = job.getConfFile();
+ try {
+ ConfInfo info = new ConfInfo(job);
+
+ html.div().h3(confPath.toString())._();
+ TBODY<TABLE<Hamlet>> tbody = html.
+ // Tasks table
+ table("#conf").
+ thead().
+ tr().
+ th(_TH, "key").
+ th(_TH, "value").
+ th(_TH, "source chain").
+ _().
+ _().
+ tbody();
+ for (ConfEntryInfo entry : info.getProperties()) {
+ StringBuffer buffer = new StringBuffer();
+ String[] sources = entry.getSource();
+ //Skip the last entry, because it is always the same HDFS file, and
+ // output them in reverse order so most recent is output first
+ boolean first = true;
+ for(int i = (sources.length - 2); i >= 0; i--) {
+ if(!first) {
+ // \u2B05 is an arrow <--
+ buffer.append(" \u2B05 ");
+ }
+ first = false;
+ buffer.append(sources[i]);
+ }
+ tbody.
+ tr().
+ td(entry.getName()).
+ td(entry.getValue()).
+ td(buffer.toString()).
+ _();
+ }
+ tbody._().
+ tfoot().
+ tr().
+ th().input("search_init").$type(InputType.text).$name("key").$value("key")._()._().
+ th().input("search_init").$type(InputType.text).$name("value").$value("value")._()._().
+ th().input("search_init").$type(InputType.text).$name("source chain").$value("source chain")._()._().
+ _().
+ _().
+ _();
+ } catch(IOException e) {
+ LOG.error("Error while reading "+confPath, e);
+ html.p()._("Sorry got an error while reading conf file. ",confPath);
+ }
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersBlock.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersBlock.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersBlock.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersBlock.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,208 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import static org.apache.hadoop.mapreduce.v2.app2.webapp.AMParams.JOB_ID;
+import static org.apache.hadoop.mapreduce.v2.app2.webapp.AMParams.TASK_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_TABLE;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+import com.google.inject.Inject;
+
+public class CountersBlock extends HtmlBlock {
+ Job job;
+ Task task;
+ Counters total;
+ Counters map;
+ Counters reduce;
+
+ @Inject CountersBlock(AppContext appCtx, ViewContext ctx) {
+ super(ctx);
+ getCounters(appCtx);
+ }
+
+ @Override protected void render(Block html) {
+ if (job == null) {
+ html.
+ p()._("Sorry, no counters for nonexistent", $(JOB_ID, "job"))._();
+ return;
+ }
+ if (!$(TASK_ID).isEmpty() && task == null) {
+ html.
+ p()._("Sorry, no counters for nonexistent", $(TASK_ID, "task"))._();
+ return;
+ }
+
+ if(total == null || total.getGroupNames() == null || total.countCounters() == 0) {
+ String type = $(TASK_ID);
+ if(type == null || type.isEmpty()) {
+ type = $(JOB_ID, "the job");
+ }
+ html.
+ p()._("Sorry it looks like ",type," has no counters.")._();
+ return;
+ }
+
+ String urlBase;
+ String urlId;
+ if(task != null) {
+ urlBase = "singletaskcounter";
+ urlId = MRApps.toString(task.getID());
+ } else {
+ urlBase = "singlejobcounter";
+ urlId = MRApps.toString(job.getID());
+ }
+
+
+ int numGroups = 0;
+ TBODY<TABLE<DIV<Hamlet>>> tbody = html.
+ div(_INFO_WRAP).
+ table("#counters").
+ thead().
+ tr().
+ th(".group.ui-state-default", "Counter Group").
+ th(".ui-state-default", "Counters")._()._().
+ tbody();
+ for (CounterGroup g : total) {
+ CounterGroup mg = map == null ? null : map.getGroup(g.getName());
+ CounterGroup rg = reduce == null ? null : reduce.getGroup(g.getName());
+ ++numGroups;
+ // This is mostly for demonstration :) Typically we'd introduced
+ // a CounterGroup block to reduce the verbosity. OTOH, this
+ // serves as an indicator of where we're in the tag hierarchy.
+ TR<THEAD<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>>> groupHeadRow = tbody.
+ tr().
+ th().$title(g.getName()).$class("ui-state-default").
+ _(fixGroupDisplayName(g.getDisplayName()))._().
+ td().$class(C_TABLE).
+ table(".dt-counters").
+ thead().
+ tr().th(".name", "Name");
+
+ if (map != null) {
+ groupHeadRow.th("Map").th("Reduce");
+ }
+ // Ditto
+ TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>> group = groupHeadRow.
+ th(map == null ? "Value" : "Total")._()._().
+ tbody();
+ for (Counter counter : g) {
+ // Ditto
+ TR<TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>>> groupRow = group.
+ tr();
+ if (task == null && mg == null && rg == null) {
+ groupRow.td().$title(counter.getName())._(counter.getDisplayName()).
+ _();
+ } else {
+ groupRow.td().$title(counter.getName()).
+ a(url(urlBase,urlId,g.getName(),
+ counter.getName()), counter.getDisplayName()).
+ _();
+ }
+ if (map != null) {
+ Counter mc = mg == null ? null : mg.findCounter(counter.getName());
+ Counter rc = rg == null ? null : rg.findCounter(counter.getName());
+ groupRow.
+ td(mc == null ? "0" : String.valueOf(mc.getValue())).
+ td(rc == null ? "0" : String.valueOf(rc.getValue()));
+ }
+ groupRow.td(String.valueOf(counter.getValue()))._();
+ }
+ group._()._()._()._();
+ }
+ tbody._()._()._();
+ }
+
+ private void getCounters(AppContext ctx) {
+ JobId jobID = null;
+ TaskId taskID = null;
+ String tid = $(TASK_ID);
+ if (!tid.isEmpty()) {
+ taskID = MRApps.toTaskID(tid);
+ jobID = taskID.getJobId();
+ } else {
+ String jid = $(JOB_ID);
+ if (jid != null && !jid.isEmpty()) {
+ jobID = MRApps.toJobID(jid);
+ }
+ }
+ if (jobID == null) {
+ return;
+ }
+ job = ctx.getJob(jobID);
+ if (job == null) {
+ return;
+ }
+ if (taskID != null) {
+ task = job.getTask(taskID);
+ if (task == null) {
+ return;
+ }
+ total = task.getCounters();
+ return;
+ }
+ // Get all types of counters
+ Map<TaskId, Task> tasks = job.getTasks();
+ total = job.getAllCounters();
+ boolean needTotalCounters = false;
+ if (total == null) {
+ total = new Counters();
+ needTotalCounters = true;
+ }
+ map = new Counters();
+ reduce = new Counters();
+ for (Task t : tasks.values()) {
+ Counters counters = t.getCounters();
+ if (counters == null) {
+ continue;
+ }
+ switch (t.getType()) {
+ case MAP: map.incrAllCounters(counters); break;
+ case REDUCE: reduce.incrAllCounters(counters); break;
+ }
+ if (needTotalCounters) {
+ total.incrAllCounters(counters);
+ }
+ }
+ }
+
+ private String fixGroupDisplayName(CharSequence name) {
+ return name.toString().replace(".", ".\u200B").replace("$", "\u200B$");
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersPage.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersPage.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersPage.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/CountersPage.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,54 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+
+import static org.apache.hadoop.mapreduce.v2.app2.webapp.AMParams.TASK_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+
+public class CountersPage extends AppView {
+
+ @Override protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+
+ String tid = $(TASK_ID);
+ String activeNav = "3";
+ if(tid == null || tid.isEmpty()) {
+ activeNav = "2";
+ }
+ set(initID(ACCORDION, "nav"), "{autoHeight:false, active:"+activeNav+"}");
+ set(DATATABLES_SELECTOR, "#counters .dt-counters");
+ set(initSelector(DATATABLES),
+ "{bJQueryUI:true, sDom:'t', iDisplayLength:-1}");
+ }
+
+ @Override protected void postHead(Page.HTML<_> html) {
+ html.
+ style("#counters, .dt-counters { table-layout: fixed }",
+ "#counters th { overflow: hidden; vertical-align: middle }",
+ "#counters .dataTables_wrapper { min-height: 1em }",
+ "#counters .group { width: 15em }",
+ "#counters .name { width: 30em }");
+ }
+
+ @Override protected Class<? extends SubView> content() {
+ return CountersBlock.class;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/InfoPage.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/InfoPage.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/InfoPage.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/InfoPage.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+
+import org.apache.hadoop.yarn.webapp.SubView;
+import org.apache.hadoop.yarn.webapp.view.InfoBlock;
+
+public class InfoPage extends AppView {
+
+ @Override protected void preHead(Page.HTML<_> html) {
+ commonPreHead(html);
+ setTitle("About the Application Master");
+ }
+
+ @Override protected Class<? extends SubView> content() {
+ return InfoBlock.class;
+ }
+}