You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/03 22:21:47 UTC
git commit: TEZ-718. Remove some unused classes - JobEndNotifier and
Speculation. (Mohammad Kamrul Islam via sseth)
Updated Branches:
refs/heads/master 00aecc826 -> 29a97cc1f
TEZ-718. Remove some unused classes - JobEndNotifier and Speculation.
(Mohammad Kamrul Islam via sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/29a97cc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/29a97cc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/29a97cc1
Branch: refs/heads/master
Commit: 29a97cc1f22a2fbb06ecda4df48d1342621e3391
Parents: 00aecc8
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 3 13:21:00 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 3 13:21:00 2014 -0800
----------------------------------------------------------------------
.../org/apache/tez/dag/app/JobEndNotifier.java | 187 -------
.../tez/dag/app/dag/event/VertexEventType.java | 3 -
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 28 --
.../tez/dag/app/speculate/DataStatistics.java | 78 ---
.../dag/app/speculate/DefaultSpeculator.java | 504 -------------------
...ponentiallySmoothedTaskRuntimeEstimator.java | 192 -------
.../speculate/LegacyTaskRuntimeEstimator.java | 145 ------
.../app/speculate/NullTaskRuntimesEngine.java | 72 ---
.../tez/dag/app/speculate/Speculator.java | 45 --
.../tez/dag/app/speculate/SpeculatorEvent.java | 86 ----
.../dag/app/speculate/StartEndTimesBase.java | 195 -------
.../dag/app/speculate/TaskRuntimeEstimator.java | 90 ----
.../app/speculate/TaskSpeculationPredicate.java | 38 --
.../tez/dag/app/speculate/package-info.java | 20 -
.../tez/dag/app/dag/impl/TestTaskImpl.java | 26 -
.../tez/mapreduce/hadoop/MRJobConfig.java | 34 --
16 files changed, 1743 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
deleted file mode 100644
index 59ed685..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/JobEndNotifier.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.Proxy;
-import java.net.URL;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.mortbay.log.Log;
-
-/**
- * <p>This class handles job end notification. Submitters of jobs can choose to
- * be notified of the end of a job by supplying a URL to which a connection
- * will be established.
- * <ul><li> The URL connection is fire and forget by default.</li> <li>
- * User can specify number of retry attempts and a time interval at which to
- * attempt retries</li><li>
- * Cluster administrators can set final parameters to set maximum number of
- * tries (0 would disable job end notification) and max time interval and a
- * proxy if needed</li><li>
- * The URL may contain sentinels which will be replaced by jobId and jobStatus
- * (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
- * </p>
- */
-public class JobEndNotifier implements Configurable {
- private static final String JOB_ID = "$jobId";
- private static final String JOB_STATUS = "$jobStatus";
-
- private Configuration conf;
- protected String userUrl;
- protected String proxyConf;
- protected int numTries; //Number of tries to attempt notification
- protected int waitInterval; //Time to wait between retrying notification
- protected URL urlToNotify; //URL to notify read from the config
- protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
-
- /**
- * Parse the URL that needs to be notified of the end of the job, along
- * with the number of retries in case of failure, the amount of time to
- * wait between retries and proxy settings
- * @param conf the configuration
- */
- public void setConf(Configuration conf) {
- this.conf = conf;
-
- numTries = Math.min(
- conf.getInt(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1
- , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1)
- );
- waitInterval = Math.min(
- conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5)
- , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5)
- );
- waitInterval = (waitInterval < 0) ? 5 : waitInterval;
-
- userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL);
-
- proxyConf = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY);
-
- //Configure the proxy to use if its set. It should be set like
- //proxyType@proxyHostname:port
- if(proxyConf != null && !proxyConf.equals("") &&
- proxyConf.lastIndexOf(":") != -1) {
- int typeIndex = proxyConf.indexOf("@");
- Proxy.Type proxyType = Proxy.Type.HTTP;
- if(typeIndex != -1 &&
- proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
- proxyType = Proxy.Type.SOCKS;
- }
- String hostname = proxyConf.substring(typeIndex + 1,
- proxyConf.lastIndexOf(":"));
- String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
- try {
- int port = Integer.parseInt(portConf);
- proxyToUse = new Proxy(proxyType,
- new InetSocketAddress(hostname, port));
- Log.info("Job end notification using proxy type \"" + proxyType +
- "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
- } catch(NumberFormatException nfe) {
- Log.warn("Job end notification couldn't parse configured proxy's port "
- + portConf + ". Not going to use a proxy");
- }
- }
-
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Notify the URL just once. Use best effort. Timeout hard coded to 5
- * seconds.
- */
- protected boolean notifyURLOnce() {
- boolean success = false;
- try {
- Log.info("Job end notification trying " + urlToNotify);
- HttpURLConnection conn =
- (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
- conn.setConnectTimeout(5*1000);
- conn.setReadTimeout(5*1000);
- conn.setAllowUserInteraction(false);
- if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
- Log.warn("Job end notification to " + urlToNotify +" failed with code: "
- + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
- +"\"");
- }
- else {
- success = true;
- Log.info("Job end notification to " + urlToNotify + " succeeded");
- }
- } catch(IOException ioe) {
- Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
- }
- return success;
- }
-
- /**
- * Notify a server of the completion of a submitted job. The user must have
- * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
- * @param jobReport JobReport used to read JobId and JobStatus
- * @throws InterruptedException
- */
- public void notify(JobReport jobReport)
- throws InterruptedException {
- // Do we need job-end notification?
- if (userUrl == null) {
- Log.info("Job end notification URL not set, skipping.");
- return;
- }
-
- //Do string replacements for jobId and jobStatus
- if (userUrl.contains(JOB_ID)) {
- userUrl = userUrl.replace(JOB_ID, jobReport.getJobId().toString());
- }
- if (userUrl.contains(JOB_STATUS)) {
- userUrl = userUrl.replace(JOB_STATUS, jobReport.getJobState().toString());
- }
-
- // Create the URL, ensure sanity
- try {
- urlToNotify = new URL(userUrl);
- } catch (MalformedURLException mue) {
- Log.warn("Job end notification couldn't parse " + userUrl, mue);
- return;
- }
-
- // Send notification
- boolean success = false;
- while (numTries-- > 0 && !success) {
- Log.info("Job end notification attempts left " + numTries);
- success = notifyURLOnce();
- if (!success) {
- Thread.sleep(waitInterval);
- }
- }
- if (!success) {
- Log.warn("Job end notification failed to notify : " + urlToNotify);
- } else {
- Log.info("Job end notification succeeded for " + jobReport.getJobId());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index fccfe91..0cf14eb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -35,9 +35,6 @@ public enum VertexEventType {
V_SOURCE_TASK_ATTEMPT_COMPLETED,
V_SOURCE_VERTEX_STARTED,
- //Producer:Speculator
- V_ADD_SPEC_ATTEMPT,
-
//Producer:Task
V_TASK_COMPLETED,
V_TASK_RESCHEDULED,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index d4012f2..53ba8ad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -734,20 +734,6 @@ public class TaskAttemptImpl implements TaskAttempt,
// }
}
-// private void maybeSendSpeculatorContainerRequired() {
-// if (!speculatorContainerRequestSent) {
-// sendEvent(new SpeculatorEvent(getID().getTaskID(), +1));
-// speculatorContainerRequestSent = true;
-// }
-// }
-//
-// private void maybeSendSpeculatorContainerNoLongerRequired() {
-// if (speculatorContainerRequestSent) {
-// sendEvent(new SpeculatorEvent(getID().getTaskID(), -1));
-// speculatorContainerRequestSent = false;
-// }
-// }
-
private void sendTaskAttemptCleanupEvent() {
// TaskAttemptContext taContext =
// new TaskAttemptContextImpl(this.conf,
@@ -821,8 +807,6 @@ public class TaskAttemptImpl implements TaskAttempt,
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
- // Event to speculator - containerNeeded++
- //ta.maybeSendSpeculatorContainerRequired();
// TODO Creating the remote task here may not be required in case of
// recovery.
@@ -965,11 +949,6 @@ public class TaskAttemptImpl implements TaskAttempt,
}
}
- // Inform the speculator about the container assignment.
- //ta.maybeSendSpeculatorContainerNoLongerRequired();
- // Inform speculator about startTime
- //ta.sendEvent(new SpeculatorEvent(ta.attemptId, true, ta.launchTime));
-
// Inform the Task
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
TaskEventType.T_ATTEMPT_LAUNCHED));
@@ -998,8 +977,6 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
.getTaskAttemptState()));
}
- // Decrement speculator container request.
- //ta.maybeSendSpeculatorContainerNoLongerRequired();
}
}
@@ -1057,9 +1034,6 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.reportedStatus.progress = statusEvent.getProgress();
ta.reportedStatus.counters = statusEvent.getCounters();
- // Inform speculator of status.
- //ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
-
ta.updateProgressSplits();
}
@@ -1083,8 +1057,6 @@ public class TaskAttemptImpl implements TaskAttempt,
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
ta.setFinishTime();
- // Inform the speculator.
- //ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
// Send out history event.
ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
deleted file mode 100644
index 48a131a..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DataStatistics.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-public class DataStatistics {
- private int count = 0;
- private double sum = 0;
- private double sumSquares = 0;
-
- public DataStatistics() {
- }
-
- public DataStatistics(double initNum) {
- this.count = 1;
- this.sum = initNum;
- this.sumSquares = initNum * initNum;
- }
-
- public synchronized void add(double newNum) {
- this.count++;
- this.sum += newNum;
- this.sumSquares += newNum * newNum;
- }
-
- public synchronized void updateStatistics(double old, double update) {
- this.sum += update - old;
- this.sumSquares += (update * update) - (old * old);
- }
-
- public synchronized double mean() {
- return count == 0 ? 0.0 : sum/count;
- }
-
- public synchronized double var() {
- // E(X^2) - E(X)^2
- if (count <= 1) {
- return 0.0;
- }
- double mean = mean();
- return Math.max((sumSquares/count) - mean * mean, 0.0d);
- }
-
- public synchronized double std() {
- return Math.sqrt(this.var());
- }
-
- public synchronized double outlier(float sigma) {
- if (count != 0.0) {
- return mean() + std() * sigma;
- }
-
- return 0.0;
- }
-
- public synchronized double count() {
- return count;
- }
-
- public String toString() {
- return "DataStatistics: count is " + count + ", sum is " + sum +
- ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
deleted file mode 100644
index 24948c0..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/DefaultSpeculator.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.app.dag.event.TaskEvent;
-import org.apache.tez.dag.app.dag.event.TaskEventType;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.TezBuilderUtils;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-// FIXME does not handle multiple vertices
-public class DefaultSpeculator extends AbstractService implements
- Speculator {
-
- private static final long ON_SCHEDULE = Long.MIN_VALUE;
- private static final long ALREADY_SPECULATING = Long.MIN_VALUE + 1;
- private static final long TOO_NEW = Long.MIN_VALUE + 2;
- private static final long PROGRESS_IS_GOOD = Long.MIN_VALUE + 3;
- private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
- private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
-
- private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
- private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
-
- private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
- private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
- private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
-
- private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
-
- private final ConcurrentMap<TezTaskID, Boolean> runningTasks
- = new ConcurrentHashMap<TezTaskID, Boolean>();
-
- private final Map<Task, AtomicBoolean> pendingSpeculations
- = new ConcurrentHashMap<Task, AtomicBoolean>();
-
- // These are the current needs, not the initial needs. For each job, these
- // record the number of attempts that exist and that are actively
- // waiting for a container [as opposed to running or finished]
- // TODO handle multiple dags
- private final ConcurrentMap<TezVertexID, AtomicInteger> vertexContainerNeeds
- = new ConcurrentHashMap<TezVertexID, AtomicInteger>();
-
- private final Set<TezTaskID> mayHaveSpeculated = new HashSet<TezTaskID>();
-
- private final Configuration conf;
- private AppContext context;
- private Thread speculationBackgroundThread = null;
- private BlockingQueue<SpeculatorEvent> eventQueue
- = new LinkedBlockingQueue<SpeculatorEvent>();
- private TaskRuntimeEstimator estimator;
-
- private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
-
- private final Clock clock;
-
- private final EventHandler<TaskEvent> eventHandler;
-
- public DefaultSpeculator(Configuration conf, AppContext context) {
- this(conf, context, context.getClock());
- }
-
- public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
- this(conf, context, getEstimator(conf, context), clock);
- }
-
- static private TaskRuntimeEstimator getEstimator
- (Configuration conf, AppContext context) {
- TaskRuntimeEstimator estimator;
-
- try {
- // "yarn.mapreduce.job.task.runtime.estimator.class"
- Class<? extends TaskRuntimeEstimator> estimatorClass
- = conf.getClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
- LegacyTaskRuntimeEstimator.class,
- TaskRuntimeEstimator.class);
-
- Constructor<? extends TaskRuntimeEstimator> estimatorConstructor
- = estimatorClass.getConstructor();
-
- estimator = estimatorConstructor.newInstance();
-
- estimator.contextualize(conf, context);
- } catch (InstantiationException ex) {
- LOG.error("Can't make a speculation runtime extimator", ex);
- throw new TezUncheckedException(ex);
- } catch (IllegalAccessException ex) {
- LOG.error("Can't make a speculation runtime extimator", ex);
- throw new TezUncheckedException(ex);
- } catch (InvocationTargetException ex) {
- LOG.error("Can't make a speculation runtime extimator", ex);
- throw new TezUncheckedException(ex);
- } catch (NoSuchMethodException ex) {
- LOG.error("Can't make a speculation runtime extimator", ex);
- throw new TezUncheckedException(ex);
- }
-
- return estimator;
- }
-
- // This constructor is designed to be called by other constructors.
- // However, it's public because we do use it in the test cases.
- // Normally we figure out our own estimator.
- public DefaultSpeculator
- (Configuration conf, AppContext context,
- TaskRuntimeEstimator estimator, Clock clock) {
- super(DefaultSpeculator.class.getName());
-
- this.conf = conf;
- this.context = context;
- this.estimator = estimator;
- this.clock = clock;
- this.eventHandler = context.getEventHandler();
- }
-
-/* ************************************************************* */
-
- // This is the task-mongering that creates the two new threads -- one for
- // processing events from the event queue and one for periodically
- // looking for speculation opportunities
-
- @Override
- public void serviceStart() {
- Runnable speculationBackgroundCore
- = new Runnable() {
- @Override
- public void run() {
- while (!Thread.currentThread().isInterrupted()) {
- long backgroundRunStartTime = clock.getTime();
- try {
- int speculations = computeSpeculations();
- long mininumRecomp
- = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
- : SOONEST_RETRY_AFTER_NO_SPECULATE;
-
- long wait = Math.max(mininumRecomp,
- clock.getTime() - backgroundRunStartTime);
-
- if (speculations > 0) {
- LOG.info("We launched " + speculations
- + " speculations. Sleeping " + wait + " milliseconds.");
- }
-
- Object pollResult
- = scanControl.poll(wait, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.error("Background thread returning, interrupted : " + e);
- e.printStackTrace(System.out);
- return;
- }
- }
- }
- };
- speculationBackgroundThread = new Thread
- (speculationBackgroundCore, "DefaultSpeculator background processing");
- speculationBackgroundThread.start();
- }
-
- @Override
- public void serviceStop() {
- // this could be called before background thread is established
- if (speculationBackgroundThread != null) {
- speculationBackgroundThread.interrupt();
- }
- }
-
- @Override
- public void handleAttempt(TaskAttemptStatusOld status) {
- long timestamp = clock.getTime();
- statusUpdate(status, timestamp);
- }
-
- // This section is not part of the Speculator interface; it's used only for
- // testing
- public boolean eventQueueEmpty() {
- return eventQueue.isEmpty();
- }
-
- // This interface is intended to be used only for test cases.
- public void scanForSpeculations() {
- LOG.info("We got asked to run a debug speculation scan.");
- // debug
- System.out.println("We got asked to run a debug speculation scan.");
- System.out.println("There are " + scanControl.size()
- + " events stacked already.");
- scanControl.add(new Object());
- Thread.yield();
- }
-
-
-/* ************************************************************* */
-
- // This section contains the code that gets run for a SpeculatorEvent
-
- private AtomicInteger containerNeed(TezTaskID taskID) {
- TezVertexID vId = taskID.getVertexID();
-
- AtomicInteger result = vertexContainerNeeds.get(vId);
-
- if (result == null) {
- vertexContainerNeeds.putIfAbsent(vId, new AtomicInteger(0));
- result = vertexContainerNeeds.get(vId);
- }
-
- return result;
- }
-
- private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
- switch (event.getType()) {
- case ATTEMPT_STATUS_UPDATE:
- statusUpdate(event.getReportedStatus(), event.getTimestamp());
- break;
-
- case TASK_CONTAINER_NEED_UPDATE:
- {
- AtomicInteger need = containerNeed(event.getTaskID());
- need.addAndGet(event.containersNeededChange());
- break;
- }
-
- case ATTEMPT_START:
- {
- LOG.info("ATTEMPT_START " + event.getTaskID());
- estimator.enrollAttempt
- (event.getReportedStatus(), event.getTimestamp());
- break;
- }
-
- case JOB_CREATE:
- {
- LOG.info("JOB_CREATE " + event.getJobID());
- estimator.contextualize(getConfig(), context);
- break;
- }
- }
- }
-
- /**
- * Absorbs one TaskAttemptStatus
- *
- * @param reportedStatus the status report that we got from a task attempt
- * that we want to fold into the speculation data for this job
- * @param timestamp the time this status corresponds to. This matters
- * because statuses contain progress.
- */
- protected void statusUpdate(TaskAttemptStatusOld reportedStatus, long timestamp) {
-
- String stateString = reportedStatus.taskState.toString();
-
- TezTaskAttemptID attemptID = reportedStatus.id;
- TezTaskID taskID = attemptID.getTaskID();
- DAG job = context.getCurrentDAG();
-
- if (job == null) {
- return;
- }
-
- Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
- if (task == null) {
- return;
- }
-
- estimator.updateAttempt(reportedStatus, timestamp);
-
- // If the task is already known to be speculation-bait, don't do anything
- if (pendingSpeculations.get(task) != null) {
- if (pendingSpeculations.get(task).get()) {
- return;
- }
- }
-
- if (stateString.equals(TaskAttemptState.RUNNING.name())) {
- runningTasks.putIfAbsent(taskID, Boolean.TRUE);
- } else {
- runningTasks.remove(taskID, Boolean.TRUE);
- }
- }
-
-/* ************************************************************* */
-
-// This is the code section that runs periodically and adds speculations for
-// those jobs that need them.
-
-
- // This can return a few magic values for tasks that shouldn't speculate:
- // returns ON_SCHEDULE if thresholdRuntime(taskID) says that we should not
- // considering speculating this task
- // returns ALREADY_SPECULATING if that is true. This has priority.
- // returns TOO_NEW if our companion task hasn't gotten any information
- // returns PROGRESS_IS_GOOD if the task is sailing through
- // returns NOT_RUNNING if the task is not running
- //
- // All of these values are negative. Any value that should be allowed to
- // speculate is 0 or positive.
- private long speculationValue(TezTaskID taskID, long now) {
- DAG job = context.getCurrentDAG();
- Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
- Map<TezTaskAttemptID, TaskAttempt> attempts = task.getAttempts();
- long acceptableRuntime = Long.MIN_VALUE;
- long result = Long.MIN_VALUE;
-
- if (!mayHaveSpeculated.contains(taskID)) {
- acceptableRuntime = estimator.thresholdRuntime(taskID);
- if (acceptableRuntime == Long.MAX_VALUE) {
- return ON_SCHEDULE;
- }
- }
-
- TezTaskAttemptID runningTaskAttemptID = null;
-
- int numberRunningAttempts = 0;
-
- for (TaskAttempt taskAttempt : attempts.values()) {
- if (taskAttempt.getState() == TaskAttemptState.RUNNING
- || taskAttempt.getState() == TaskAttemptState.STARTING) {
- if (++numberRunningAttempts > 1) {
- return ALREADY_SPECULATING;
- }
- runningTaskAttemptID = taskAttempt.getID();
-
- long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);
-
- long taskAttemptStartTime
- = estimator.attemptEnrolledTime(runningTaskAttemptID);
- if (taskAttemptStartTime > now) {
- // This background process ran before we could process the task
- // attempt status change that chronicles the attempt start
- return TOO_NEW;
- }
-
- long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
-
- long estimatedReplacementEndTime
- = now + estimator.estimatedNewAttemptRuntime(taskID);
-
- if (estimatedEndTime < now) {
- return PROGRESS_IS_GOOD;
- }
-
- if (estimatedReplacementEndTime >= estimatedEndTime) {
- return TOO_LATE_TO_SPECULATE;
- }
-
- result = estimatedEndTime - estimatedReplacementEndTime;
- }
- }
-
- // If we are here, there's at most one task attempt.
- if (numberRunningAttempts == 0) {
- return NOT_RUNNING;
- }
-
-
-
- if (acceptableRuntime == Long.MIN_VALUE) {
- acceptableRuntime = estimator.thresholdRuntime(taskID);
- if (acceptableRuntime == Long.MAX_VALUE) {
- return ON_SCHEDULE;
- }
- }
-
- return result;
- }
-
- //Add attempt to a given Task.
- protected void addSpeculativeAttempt(TezTaskID taskID) {
- LOG.info
- ("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID);
- eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
- mayHaveSpeculated.add(taskID);
- }
-
- @Override
- public void handle(SpeculatorEvent event) {
- processSpeculatorEvent(event);
- }
-
-
- private int maybeScheduleAMapSpeculation() {
- return maybeScheduleASpeculation(0);
- }
-
- private int maybeScheduleAReduceSpeculation() {
- return maybeScheduleASpeculation(1);
- }
-
- private int maybeScheduleASpeculation(int vertexId) {
- int successes = 0;
-
- long now = clock.getTime();
-
- // FIXME this needs to be fixed for a DAG
- // TODO handle multiple dags
- for (ConcurrentMap.Entry<TezVertexID, AtomicInteger> vertexEntry :
- vertexContainerNeeds.entrySet()) {
- // This race conditon is okay. If we skip a speculation attempt we
- // should have tried because the event that lowers the number of
- // containers needed to zero hasn't come through, it will next time.
- // Also, if we miss the fact that the number of containers needed was
- // zero but increased due to a failure it's not too bad to launch one
- // container prematurely.
- if (vertexEntry.getValue().get() > 0) {
- continue;
- }
-
- int numberSpeculationsAlready = 0;
- int numberRunningTasks = 0;
-
- // loop through the tasks of the kind
- DAG job = context.getCurrentDAG();
-
- Map<TezTaskID, Task> tasks =
- job.getVertex(TezBuilderUtils.newVertexID(job.getID(), vertexId)).getTasks();
-
- int numberAllowedSpeculativeTasks
- = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
- PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
-
- TezTaskID bestTaskID = null;
- long bestSpeculationValue = -1L;
-
- // this loop is potentially pricey.
- // TODO track the tasks that are potentially worth looking at
- for (Map.Entry<TezTaskID, Task> taskEntry : tasks.entrySet()) {
- long mySpeculationValue = speculationValue(taskEntry.getKey(), now);
-
- if (mySpeculationValue == ALREADY_SPECULATING) {
- ++numberSpeculationsAlready;
- }
-
- if (mySpeculationValue != NOT_RUNNING) {
- ++numberRunningTasks;
- }
-
- if (mySpeculationValue > bestSpeculationValue) {
- bestTaskID = taskEntry.getKey();
- bestSpeculationValue = mySpeculationValue;
- }
- }
- numberAllowedSpeculativeTasks
- = (int) Math.max(numberAllowedSpeculativeTasks,
- PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
-
- // If we found a speculation target, fire it off
- if (bestTaskID != null
- && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {
- addSpeculativeAttempt(bestTaskID);
- ++successes;
- }
- }
-
- return successes;
- }
-
- private int computeSpeculations() {
- // We'll try to issue one map and one reduce speculation per job per run
- return maybeScheduleAMapSpeculation() + maybeScheduleAReduceSpeculation();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
deleted file mode 100644
index 10f217d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-/**
- * This estimator exponentially smooths the rate of progress versus wallclock
- * time. Conceivably we could write an estimator that smooths time per
- * unit progress, and get different results.
- */
-public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
-
- private final ConcurrentMap<TezTaskAttemptID, AtomicReference<EstimateVector>> estimates
- = new ConcurrentHashMap<TezTaskAttemptID, AtomicReference<EstimateVector>>();
-
- private SmoothedValue smoothedValue;
-
- private long lambda;
-
- public enum SmoothedValue {
- RATE, TIME_PER_UNIT_PROGRESS
- }
-
- ExponentiallySmoothedTaskRuntimeEstimator
- (long lambda, SmoothedValue smoothedValue) {
- super();
- this.smoothedValue = smoothedValue;
- this.lambda = lambda;
- }
-
- public ExponentiallySmoothedTaskRuntimeEstimator() {
- super();
- }
-
- // immutable
- private class EstimateVector {
- final double value;
- final float basedOnProgress;
- final long atTime;
-
- EstimateVector(double value, float basedOnProgress, long atTime) {
- this.value = value;
- this.basedOnProgress = basedOnProgress;
- this.atTime = atTime;
- }
-
- EstimateVector incorporate(float newProgress, long newAtTime) {
- if (newAtTime <= atTime || newProgress < basedOnProgress) {
- return this;
- }
-
- double oldWeighting
- = value < 0.0
- ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
-
- double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
-
- if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
- newRead = 1.0 / newRead;
- }
-
- return new EstimateVector
- (value * oldWeighting + newRead * (1.0 - oldWeighting),
- newProgress, newAtTime);
- }
- }
-
- private void incorporateReading
- (TezTaskAttemptID attemptID, float newProgress, long newTime) {
- //TODO: Refactor this method, it seems more complicated than necessary.
- AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
-
- if (vectorRef == null) {
- estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
- incorporateReading(attemptID, newProgress, newTime);
- return;
- }
-
- EstimateVector oldVector = vectorRef.get();
-
- if (oldVector == null) {
- if (vectorRef.compareAndSet(null,
- new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
- return;
- }
-
- incorporateReading(attemptID, newProgress, newTime);
- return;
- }
-
- while (!vectorRef.compareAndSet
- (oldVector, oldVector.incorporate(newProgress, newTime))) {
- oldVector = vectorRef.get();
- }
- }
-
- private EstimateVector getEstimateVector(TezTaskAttemptID attemptID) {
- AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
-
- if (vectorRef == null) {
- return null;
- }
-
- return vectorRef.get();
- }
-
- @Override
- public void contextualize(Configuration conf, AppContext context) {
- super.contextualize(conf, context);
-
- lambda
- = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
- MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
- smoothedValue
- = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
- ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
- }
-
- @Override
- public long estimatedRuntime(TezTaskAttemptID id) {
- Long startTime = (Long) startTimes.get(id);
-
- if (startTime == null) {
- return -1L;
- }
-
- EstimateVector vector = getEstimateVector(id);
-
- if (vector == null) {
- return -1L;
- }
-
- long sunkTime = vector.atTime - startTime;
-
- double value = vector.value;
- float progress = vector.basedOnProgress;
-
- if (value == 0) {
- return -1L;
- }
-
- double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
-
- if (rate == 0.0) {
- return -1L;
- }
-
- double remainingTime = (1.0 - progress) / rate;
-
- return sunkTime + (long)remainingTime;
- }
-
- @Override
- public long runtimeEstimateVariance(TezTaskAttemptID id) {
- return -1L;
- }
-
- @Override
- public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
- super.updateAttempt(status, timestamp);
- TezTaskAttemptID attemptID = status.id;
-
- float progress = status.progress;
-
- incorporateReading(attemptID, progress, timestamp);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
deleted file mode 100644
index ff7564c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/LegacyTaskRuntimeEstimator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
-
- private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
- = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
- private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
- = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
-
- @Override
- public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
- super.updateAttempt(status, timestamp);
-
-
- TezTaskAttemptID attemptID = status.id;
- TezTaskID taskID = attemptID.getTaskID();
- DAG job = context.getCurrentDAG();
-
- if (job == null) {
- return;
- }
-
- Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
- if (task == null) {
- return;
- }
-
- TaskAttempt taskAttempt = task.getAttempt(attemptID);
-
- if (taskAttempt == null) {
- return;
- }
-
- Long boxedStart = (Long) startTimes.get(attemptID);
- long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
-
- // We need to do two things.
- // 1: If this is a completion, we accumulate statistics in the superclass
- // 2: If this is not a completion, we learn more about it.
-
- // This is not a completion, but we're cooking.
- //
- if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
- // See if this task is already in the registry
- AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
- AtomicLong estimateVarianceContainer
- = attemptRuntimeEstimateVariances.get(taskAttempt);
-
- if (estimateContainer == null) {
- if (attemptRuntimeEstimates.get(taskAttempt) == null) {
- attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
-
- estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
- }
- }
-
- if (estimateVarianceContainer == null) {
- attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
- estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
- }
-
-
- long estimate = -1;
- long varianceEstimate = -1;
-
- // This code assumes that we'll never consider starting a third
- // speculative task attempt if two are already running for this task
- if (start > 0 && timestamp > start) {
- estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
- varianceEstimate = (long) (estimate * status.progress / 10);
- }
- if (estimateContainer != null) {
- estimateContainer.set(estimate);
- }
- if (estimateVarianceContainer != null) {
- estimateVarianceContainer.set(varianceEstimate);
- }
- }
- }
-
- private long storedPerAttemptValue
- (Map<TaskAttempt, AtomicLong> data, TezTaskAttemptID attemptID) {
- TezTaskID taskID = attemptID.getTaskID();
- DAG job = context.getCurrentDAG();
-
- Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
- if (task == null) {
- return -1L;
- }
-
- TaskAttempt taskAttempt = task.getAttempt(attemptID);
-
- if (taskAttempt == null) {
- return -1L;
- }
-
- AtomicLong estimate = data.get(taskAttempt);
-
- return estimate == null ? -1L : estimate.get();
-
- }
-
- @Override
- public long estimatedRuntime(TezTaskAttemptID attemptID) {
- return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
- }
-
- @Override
- public long runtimeEstimateVariance(TezTaskAttemptID attemptID) {
- return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
deleted file mode 100644
index 9fa3b4b..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/NullTaskRuntimesEngine.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-
-/*
- * This class is provided solely as an exemplae of the values that mean
- * that nothing needs to be computed. It's not currently used.
- */
-public class NullTaskRuntimesEngine implements TaskRuntimeEstimator {
- @Override
- public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
- // no code
- }
-
- @Override
- public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
- return Long.MAX_VALUE;
- }
-
- @Override
- public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
- // no code
- }
-
- @Override
- public void contextualize(Configuration conf, AppContext context) {
- // no code
- }
-
- @Override
- public long thresholdRuntime(TezTaskID id) {
- return Long.MAX_VALUE;
- }
-
- @Override
- public long estimatedRuntime(TezTaskAttemptID id) {
- return -1L;
- }
- @Override
- public long estimatedNewAttemptRuntime(TezTaskID id) {
- return -1L;
- }
-
- @Override
- public long runtimeEstimateVariance(TezTaskAttemptID id) {
- return -1L;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
deleted file mode 100644
index d4d0b5a..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/Speculator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-
-/**
- * Speculator component. Task Attempts' status updates are sent to this
- * component. Concrete implementation runs the speculative algorithm and
- * sends the TaskEventType.T_ADD_ATTEMPT.
- *
- * An implementation also has to arrange for the jobs to be scanned from
- * time to time, to launch the speculations.
- */
-public interface Speculator
- extends EventHandler<SpeculatorEvent> {
-
- enum EventType {
- ATTEMPT_STATUS_UPDATE,
- ATTEMPT_START,
- TASK_CONTAINER_NEED_UPDATE,
- JOB_CREATE
- }
-
- // This will be implemented if we go to a model where the events are
- // processed within the TaskAttempts' state transitions' code.
- public void handleAttempt(TaskAttemptStatusOld status);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
deleted file mode 100644
index 917abb6..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/SpeculatorEvent.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-public class SpeculatorEvent extends AbstractEvent<Speculator.EventType> {
-
- // valid for ATTEMPT_STATUS_UPDATE
- private TaskAttemptStatusOld reportedStatus;
-
- // valid for TASK_CONTAINER_NEED_UPDATE
- private TezTaskID taskID;
- private int containersNeededChange;
-
- // valid for CREATE_JOB
- private TezDAGID dagId;
-
- public SpeculatorEvent(TezDAGID dagId, long timestamp) {
- super(Speculator.EventType.JOB_CREATE, timestamp);
- this.dagId = dagId;
- }
-
- public SpeculatorEvent(TaskAttemptStatusOld reportedStatus, long timestamp) {
- super(Speculator.EventType.ATTEMPT_STATUS_UPDATE, timestamp);
- this.reportedStatus = reportedStatus;
- }
-
- public SpeculatorEvent(TezTaskAttemptID attemptID, boolean flag, long timestamp) {
- super(Speculator.EventType.ATTEMPT_START, timestamp);
- this.reportedStatus = new TaskAttemptStatusOld();
- this.reportedStatus.id = attemptID;
- this.taskID = attemptID.getTaskID();
- }
-
- /*
- * This c'tor creates a TASK_CONTAINER_NEED_UPDATE event .
- * We send a +1 event when a task enters a state where it wants a container,
- * and a -1 event when it either gets one or withdraws the request.
- * The per job sum of all these events is the number of containers requested
- * but not granted. The intent is that we only do speculations when the
- * speculation wouldn't compete for containers with tasks which need
- * to be run.
- */
- public SpeculatorEvent(TezTaskID taskID, int containersNeededChange) {
- super(Speculator.EventType.TASK_CONTAINER_NEED_UPDATE);
- this.taskID = taskID;
- this.containersNeededChange = containersNeededChange;
- }
-
- public TaskAttemptStatusOld getReportedStatus() {
- return reportedStatus;
- }
-
- public int containersNeededChange() {
- return containersNeededChange;
- }
-
- public TezTaskID getTaskID() {
- return taskID;
- }
-
- public TezDAGID getJobID() {
- return dagId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
deleted file mode 100644
index 68d1369..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/StartEndTimesBase.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-abstract class StartEndTimesBase<V> implements TaskRuntimeEstimator {
- static final float MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE
- = 0.05F;
- static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
- = 1;
-
- protected Configuration conf = null;
- protected AppContext context = null;
-
- protected final Map<TezTaskAttemptID, Long> startTimes
- = new ConcurrentHashMap<TezTaskAttemptID, Long>();
-
- // XXXX This class design assumes that the contents of AppContext.getAllJobs
- // never changes. Is that right?
- //
- // This assumption comes in in several places, mostly in data structure that
- // can grow without limit if a AppContext gets new Job's when the old ones
- // run out. Also, these mapper statistics blocks won't cover the Job's
- // we don't know about.
- // TODO handle multiple DAGs
- protected final Map<TezVertexID, DataStatistics> vertexStatistics
- = new HashMap<TezVertexID, DataStatistics>();
-
- private float slowTaskRelativeTresholds = 0f;
-
- protected final Set<Task> doneTasks = new HashSet<Task>();
-
- @Override
- public void enrollAttempt(TaskAttemptStatusOld status, long timestamp) {
- startTimes.put(status.id,timestamp);
- }
-
- @Override
- public long attemptEnrolledTime(TezTaskAttemptID attemptID) {
- Long result = startTimes.get(attemptID);
-
- return result == null ? Long.MAX_VALUE : result;
- }
-
-
- @Override
- public void contextualize(Configuration conf, AppContext context) {
- this.conf = conf;
- this.context = context;
-
-
- final DAG dag = context.getCurrentDAG();
- for (Entry<TezVertexID, Vertex> entry: dag.getVertices().entrySet()) {
- vertexStatistics.put(entry.getKey(), new DataStatistics());
- slowTaskRelativeTresholds =
- conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f);
- }
- }
-
- protected DataStatistics dataStatisticsForTask(TezTaskID taskID) {
- DAG dag = context.getCurrentDAG();
-
- if (dag == null) {
- return null;
- }
-
- Task task = dag.getVertex(taskID.getVertexID()).getTask(taskID);
-
- if (task == null) {
- return null;
- }
-
- return vertexStatistics.get(taskID.getVertexID());
- }
-
- @Override
- public long thresholdRuntime(TezTaskID taskID) {
- DAG job = context.getCurrentDAG();
-
- DataStatistics statistics = dataStatisticsForTask(taskID);
-
- Vertex v = job.getVertex(taskID.getVertexID());
- int completedTasksOfType = v.getCompletedTasks();
- int totalTasksOfType = v.getTotalTasks();
-
- if (completedTasksOfType < MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
- || (((float)completedTasksOfType) / totalTasksOfType)
- < MINIMUM_COMPLETE_PROPORTION_TO_SPECULATE ) {
- return Long.MAX_VALUE;
- }
-
- long result = statistics == null
- ? Long.MAX_VALUE
- : (long)statistics.outlier(slowTaskRelativeTresholds);
- return result;
- }
-
- @Override
- public long estimatedNewAttemptRuntime(TezTaskID id) {
- DataStatistics statistics = dataStatisticsForTask(id);
-
- if (statistics == null) {
- return -1L;
- }
-
- return (long)statistics.mean();
- }
-
- @Override
- public void updateAttempt(TaskAttemptStatusOld status, long timestamp) {
-
- TezTaskAttemptID attemptID = status.id;
- TezTaskID taskID = attemptID.getTaskID();
- DAG job = context.getCurrentDAG();
-
- if (job == null) {
- return;
- }
-
- Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
-
- if (task == null) {
- return;
- }
-
- Long boxedStart = startTimes.get(attemptID);
- long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
-
- TaskAttempt taskAttempt = task.getAttempt(attemptID);
-
- if (taskAttempt.getState() == TaskAttemptState.SUCCEEDED) {
- boolean isNew = false;
- // is this a new success?
- synchronized (doneTasks) {
- if (!doneTasks.contains(task)) {
- doneTasks.add(task);
- isNew = true;
- }
- }
-
- // It's a new completion
- // Note that if a task completes twice [because of a previous speculation
- // and a race, or a success followed by loss of the machine with the
- // local data] we only count the first one.
- if (isNew) {
- long finish = timestamp;
- if (start > 1L && finish > 1L && start <= finish) {
- long duration = finish - start;
-
- DataStatistics statistics
- = dataStatisticsForTask(taskID);
-
- if (statistics != null) {
- statistics.add(duration);
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
deleted file mode 100644
index a68dc50..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatusOld;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-
-
-
-public interface TaskRuntimeEstimator {
- public void enrollAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
-
- public long attemptEnrolledTime(TezTaskAttemptID attemptID);
-
- public void updateAttempt(TaskAttemptStatusOld reportedStatus, long timestamp);
-
- public void contextualize(Configuration conf, AppContext context);
-
- /**
- *
- * Find a maximum reasonable execution wallclock time. Includes the time
- * already elapsed.
- *
- * Find a maximum reasonable execution time. Includes the time
- * already elapsed. If the projected total execution time for this task
- * ever exceeds its reasonable execution time, we may speculate it.
- *
- * @param id the {@link TezTaskID} of the task we are asking about
- * @return the task's maximum reasonable runtime, or MAX_VALUE if
- * we don't have enough information to rule out any runtime,
- * however long.
- *
- */
- public long thresholdRuntime(TezTaskID id);
-
- /**
- *
- * Estimate a task attempt's total runtime. Includes the time already
- * elapsed.
- *
- * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
- * @return our best estimate of the attempt's runtime, or {@code -1} if
- * we don't have enough information yet to produce an estimate.
- *
- */
- public long estimatedRuntime(TezTaskAttemptID id);
-
- /**
- *
- * Estimates how long a new attempt on this task will take if we start
- * one now
- *
- * @param id the {@link TezTaskID} of the task we are asking about
- * @return our best estimate of a new attempt's runtime, or {@code -1} if
- * we don't have enough information yet to produce an estimate.
- *
- */
- public long estimatedNewAttemptRuntime(TezTaskID id);
-
- /**
- *
- * Computes the width of the error band of our estimate of the task
- * runtime as returned by {@link #estimatedRuntime(TezTaskAttemptID)}
- *
- * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
- * @return our best estimate of the attempt's runtime, or {@code -1} if
- * we don't have enough information yet to produce an estimate.
- *
- */
- public long runtimeEstimateVariance(TezTaskAttemptID id);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
deleted file mode 100644
index ae2c612..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.speculate;
-
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.records.TezTaskID;
-
-
-public class TaskSpeculationPredicate {
- boolean canSpeculate(AppContext context, TezTaskID taskID) {
- // This class rejects speculating any task that already has speculations,
- // or isn't running.
- // Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
- // can be even more restrictive.
- // TODO handle multiple dags
- DAG job = context.getCurrentDAG();
- Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
- return task.getAttempts().size() == 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java b/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
deleted file mode 100644
index 8b6d62a..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-@InterfaceAudience.Private
-package org.apache.tez.dag.app.speculate;
-import org.apache.hadoop.classification.InterfaceAudience;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 7dd6df0..b156c11 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -417,32 +417,6 @@ public class TestTaskImpl {
assertTaskSucceededState();
}
-
- @Test
- public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
- TezTaskID taskId = getNewTaskID();
- scheduleTaskAttempt(taskId);
- launchTaskAttempt(mockTask.getLastAttempt().getID());
- updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
-
- // Add a speculative task attempt that succeeds
- mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
- TaskEventType.T_ADD_SPEC_ATTEMPT));
- launchTaskAttempt(mockTask.getLastAttempt().getID());
- mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
- TaskEventType.T_ATTEMPT_SUCCEEDED));
-
- // The task should now have succeeded
- assertTaskSucceededState();
-
- // Now fail the first task attempt, after the second has succeeded
- mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0)
- .getID(), TaskEventType.T_ATTEMPT_FAILED));
-
- // The task should still be in the succeeded state
- assertTaskSucceededState();
-
- }
@SuppressWarnings("rawtypes")
@Test
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/29a97cc1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 7a9cafb..45de43a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -79,11 +79,6 @@ public interface MRJobConfig {
public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
- public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
-
- public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
-
- public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
@@ -201,8 +196,6 @@ public interface MRJobConfig {
public static final String MAP_DEBUG_SCRIPT = "mapreduce.map.debug.script";
- public static final String MAP_SPECULATIVE = "mapreduce.map.speculative";
-
public static final String MAP_FAILURES_MAX_PERCENT = "mapreduce.map.failures.maxpercent";
public static final String MAP_SKIP_INCR_PROC_COUNT = "mapreduce.map.skip.proc-count.auto-incr";
@@ -266,8 +259,6 @@ public interface MRJobConfig {
public static final String REDUCE_DEBUG_SCRIPT = "mapreduce.reduce.debug.script";
- public static final String REDUCE_SPECULATIVE = "mapreduce.reduce.speculative";
-
public static final String SHUFFLE_CONNECT_TIMEOUT = "mapreduce.reduce.shuffle.connect.timeout";
public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
@@ -442,10 +433,6 @@ public interface MRJobConfig {
MR_AM_PREFIX + "job.reduce.rampup.limit";
public static final float DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT = 0.5f;
- /** The class that should be used for speculative execution calculations.*/
- public static final String MR_AM_JOB_SPECULATOR =
- MR_AM_PREFIX + "job.speculator.class";
-
/** Class used to estimate task resource needs.*/
public static final String MR_AM_TASK_ESTIMATOR =
MR_AM_PREFIX + "job.task.estimator.class";
@@ -583,27 +570,6 @@ public interface MRJobConfig {
public static final String APPLICATION_ATTEMPT_ID =
"mapreduce.job.application.attempt.id";
- /**
- * Job end notification.
- */
- public static final String MR_JOB_END_NOTIFICATION_URL =
- "mapreduce.job.end-notification.url";
-
- public static final String MR_JOB_END_NOTIFICATION_PROXY =
- "mapreduce.job.end-notification.proxy";
-
- public static final String MR_JOB_END_RETRY_ATTEMPTS =
- "mapreduce.job.end-notification.retry.attempts";
-
- public static final String MR_JOB_END_RETRY_INTERVAL =
- "mapreduce.job.end-notification.retry.interval";
-
- public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS =
- "mapreduce.job.end-notification.max.attempts";
-
- public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
- "mapreduce.job.end-notification.max.retry.interval";
-
/*
* MR AM Service Authorization
*/