You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:50:00 UTC

[15/25] git commit: TEZ-1569. Add tests for preemption (bikas)

TEZ-1569. Add tests for preemption (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5e5683ab
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5e5683ab
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5e5683ab

Branch: refs/heads/TEZ-8
Commit: 5e5683ab5e89f1fd88883ed40574c8fb04316dd1
Parents: 8e382b3
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 12 14:15:56 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 12 14:15:56 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../java/org/apache/tez/client/TezClient.java   |   2 +-
 .../java/org/apache/tez/client/LocalClient.java |  15 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  15 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   6 +
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 245 +++++++++++++++++++
 .../org/apache/tez/dag/app/MockLocalClient.java |  48 ++++
 .../org/apache/tez/dag/app/MockTezClient.java   |  48 ++++
 .../org/apache/tez/dag/app/TestPreemption.java  | 203 +++++++++++++++
 9 files changed, 576 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 87729b3..73a3671 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ ALL CHANGES:
   TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized.
   TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts.
   TEZ-1578. Remove TeraSort from Tez codebase.
+  TEZ-1569. Add tests for preemption
 
 Release 0.5.1: Unreleased
 
@@ -33,7 +34,7 @@ ALL CHANGES
   TEZ-1534. Make client side configs available to AM and tasks.
   TEZ-1574. Support additional formats for the tez deployed archive
 
-Release 0.5.0: Unreleased
+Release 0.5.0: 2014-09-03
 
 INCOMPATIBLE CHANGES
   TEZ-1038. Move TaskLocationHint outside of VertexLocationHint.

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 13ca2dc..77ab20c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -123,7 +123,7 @@ public class TezClient {
   }
 
   @Private
-  TezClient(String name, TezConfiguration tezConf, boolean isSession,
+  protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
             @Nullable Map<String, LocalResource> localResources,
             @Nullable Credentials credentials) {
     this.clientName = name;

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 0b615fa..0a95cf0 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +55,8 @@ import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.utils.EnvironmentUpdateUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class LocalClient extends FrameworkClient {
   public static final Logger LOG = Logger.getLogger(LocalClient.class);
 
@@ -286,8 +289,7 @@ public class LocalClient extends FrameworkClient {
           int nmHttpPort = YarnConfiguration.DEFAULT_NM_WEBAPP_PORT;
           long appSubmitTime = System.currentTimeMillis();
 
-          dagAppMaster =
-              new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
+          dagAppMaster = createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
                   new SystemClock(),
                   appSubmitTime, isSession, userDir.toUri().getPath());
           clientHandler = new DAGClientHandler(dagAppMaster);
@@ -305,4 +307,13 @@ public class LocalClient extends FrameworkClient {
 
     return thread;
   }
+  
+  // this can be overridden by test code to create a mock app
+  @VisibleForTesting
+  protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
+      ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
+      Clock clock, long appSubmitTime, boolean isSession, String userDir) {
+    return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
+        new SystemClock(), appSubmitTime, isSession, userDir);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9cd716a..ea0ab3b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -156,12 +156,12 @@ import com.google.common.base.Function;
 import com.google.common.collect.Maps;
 
 /**
- * The Map-Reduce Application Master.
+ * The Tez DAG Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
  * All state changes happens via Job interface. Each event
  * results in a Finite State Transition in Job.
  *
- * MR AppMaster is the composition of loosely coupled services. The services
+ * Tez DAG AppMaster is the composition of loosely coupled services. The services
  * interact with each other via events. The components resembles the
  * Actors model. The component acts on received event and send out the
  * events to other components.
@@ -443,6 +443,11 @@ public class DAGAppMaster extends AbstractService {
       System.exit(0);
     }
   }
+  
+  @VisibleForTesting
+  protected TaskSchedulerEventHandler getTaskSchedulerEventHandler() {
+    return taskSchedulerEventHandler;
+  }
 
   private synchronized void handle(DAGAppMasterEvent event) {
     switch (event.getType()) {
@@ -1434,14 +1439,14 @@ public class DAGAppMaster extends AbstractService {
     }
     return null;
   }
-
+  
   @Override
   public synchronized void serviceStart() throws Exception {
 
     //start all the components
     startServices();
     super.serviceStart();
-
+    
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("DAGAppMaster");
@@ -1882,7 +1887,7 @@ public class DAGAppMaster extends AbstractService {
 
     UserGroupInformation.setConfiguration(conf);
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-
+    
     appMaster.appMasterUgi = UserGroupInformation
         .createRemoteUser(jobUserName);
     appMaster.appMasterUgi.addCredentials(credentials);

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index daaa81b..23f9096 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -105,6 +105,7 @@ import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
 import org.apache.tez.dag.utils.RelocalizationUtils;
@@ -690,6 +691,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
     return vertex.getVertexStatus(statusOptions);
   }
+  
+  public TaskAttemptImpl getTaskAttempt(TezTaskAttemptID taId) {
+    return (TaskAttemptImpl) getVertex(taId.getTaskID().getVertexID()).getTask(taId.getTaskID())
+        .getAttempt(taId);
+  }
 
   protected void initializeVerticesAndStart() {
     for (Vertex v : vertices.values()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
new file mode 100644
index 0000000..9fe9c4d
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -0,0 +1,245 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.launcher.ContainerLauncher;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unchecked")
+public class MockDAGAppMaster extends DAGAppMaster {
+  
+  MockContainerLauncher containerLauncher;
+  
+  // mock container launcher does not launch real tasks.
+  // Upon, launch of a container is simulates the container asking for tasks
+  // Upon receiving a task it simulates completion of the tasks
+  // It can be used to preempt the container for a given task
+  public class MockContainerLauncher extends AbstractService implements ContainerLauncher, Runnable {
+
+    BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>();
+    Thread eventHandlingThread;
+    
+    Map<ContainerId, ContainerData> containers = Maps.newConcurrentMap();
+    TaskAttemptListenerImpTezDag taListener;
+    
+    AtomicBoolean startScheduling = new AtomicBoolean(true);
+    AtomicBoolean goFlag;
+    
+    Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
+    
+    public MockContainerLauncher(AtomicBoolean goFlag) {
+      super("MockContainerLauncher");
+      this.goFlag = goFlag;
+    }
+
+    public class ContainerData {
+      ContainerId cId;
+      TezTaskAttemptID taId;
+      String vName;
+      boolean completed;
+      
+      public ContainerData(ContainerId cId) {
+        this.cId = cId;
+      }
+      
+      void clear() {
+        taId = null;
+        vName = null;
+        completed = false;
+      }
+    }
+    
+    @Override
+    public void serviceStart() throws Exception {
+      taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
+      eventHandlingThread = new Thread(this);
+      eventHandlingThread.start();
+    }
+
+    @Override
+    public void serviceStop() throws Exception {
+      if (eventHandlingThread != null) {
+        eventHandlingThread.interrupt();
+        eventHandlingThread.join(2000l);
+      }
+    }
+    
+    @Override
+    public void handle(NMCommunicatorEvent event) {
+      switch (event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        launch((NMCommunicatorLaunchRequestEvent) event);
+        break;
+      case CONTAINER_STOP_REQUEST:
+        stop((NMCommunicatorStopRequestEvent)event);
+        break;
+      }
+    }
+    
+    
+    void waitToGo() {
+      synchronized (goFlag) {
+        goFlag.set(true);
+        goFlag.notify();
+        try {
+          goFlag.wait();
+        } catch (InterruptedException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
+    }
+    
+    public void startScheduling(boolean value) {
+      startScheduling.set(value);
+    }
+
+    public Map<ContainerId, ContainerData> getContainers() {
+      return containers;
+    }
+    
+    public void preemptContainerForTask(TezTaskID tId, int uptoVersion) {
+      preemptedTasks.put(tId, uptoVersion);
+    }
+    
+    public void preemptContainer(ContainerData cData) {
+      getTaskSchedulerEventHandler().containerCompleted(null, 
+          ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED));
+      cData.clear();
+    }
+    
+    void stop(NMCommunicatorStopRequestEvent event) {
+      // remove from simulated container list
+      containers.remove(event.getContainerId());
+      getContext().getEventHandler().handle(
+          new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
+    }
+
+    void launch(NMCommunicatorLaunchRequestEvent event) {
+      // launch container by putting it in simulated container list
+      containers.put(event.getContainerId(), new ContainerData(event.getContainerId()));
+      getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId()));      
+    }
+
+    @Override
+    public void run() {
+      // wait for test to sync with us and get a reference to us. Go when sync is done
+      waitToGo();
+      while(true) {
+        if (!startScheduling.get()) { // schedule when asked to do so by the test code
+          continue;
+        }
+        for (Map.Entry<ContainerId, ContainerData> entry : containers.entrySet()) {
+          ContainerData cData = entry.getValue();
+          ContainerId cId = entry.getKey();
+          if (cData.taId == null) {
+            // if container is not assigned a task, ask for a task
+            try {
+              ContainerTask cTask = taListener.getTask(new ContainerContext(cId.toString()));
+              if (cTask == null) {
+                continue;
+              }
+              if (cTask.shouldDie()) {
+                containers.remove(cId);
+              } else {
+                cData.taId = cTask.getTaskSpec().getTaskAttemptID();
+                cData.vName = cTask.getTaskSpec().getVertexName();
+              }
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+          } else if (!cData.completed) {
+            // container is assigned a task and task is not completed
+            // complete the task or preempt the task
+            Integer version = preemptedTasks.get(cData.taId.getTaskID()); 
+            if (version != null && cData.taId.getId() <= version.intValue()) {
+              preemptContainer(cData);
+            } else {
+              // send a done notification
+              TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
+              cData.completed = true;
+              getContext().getEventHandler().handle(
+                  new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
+                      new TaskAttemptCompletedEvent(), new EventMetaData(
+                          EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)))));
+              cData.clear();
+            }
+          }
+        }
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          System.out.println("Interrupted in mock container launcher thread");
+          break;
+        }
+      }
+    }
+    
+  }
+
+  public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
+      String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime,
+      boolean isSession, String workingDirectory, AtomicBoolean launcherGoFlag) {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
+        isSession, workingDirectory);
+    containerLauncher = new MockContainerLauncher(launcherGoFlag);
+  }
+  
+  // use mock container launcher for tests
+  @Override
+  protected ContainerLauncher createContainerLauncher(final AppContext context)
+      throws UnknownHostException {
+    return containerLauncher;
+  }
+  
+  public MockContainerLauncher getContainerLauncher() {
+    return containerLauncher;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
new file mode 100644
index 0000000..7e408e1
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.LocalClient;
+
+public class MockLocalClient extends LocalClient {
+  MockDAGAppMaster mockApp;
+  AtomicBoolean mockAppLauncherGoFlag;
+  
+  public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag) {
+    this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
+  }
+  
+  protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
+      ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
+      Clock clock, long appSubmitTime, boolean isSession, String userDir) {
+    mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
+        new SystemClock(), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
+    return mockApp;
+  }
+  
+  public MockDAGAppMaster getMockApp() {
+    return mockApp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
new file mode 100644
index 0000000..617415e
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public class MockTezClient extends TezClient {
+  MockLocalClient client;
+  
+  MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
+      Map<String, LocalResource> localResources, Credentials credentials,
+      AtomicBoolean mockAppLauncherGoFlag) {
+    super(name, tezConf, isSession, localResources, credentials);
+    this.client = new MockLocalClient(mockAppLauncherGoFlag);
+  }
+  
+  protected FrameworkClient createFrameworkClient() {
+    return client;
+  }
+  
+  public MockLocalClient getLocalClient() {
+    return client;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
new file mode 100644
index 0000000..c7aacd4
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -0,0 +1,203 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestPreemption {
+  
+  static Configuration defaultConf;
+  static FileSystem localFs;
+  static Path workDir;
+  
+  static {
+    try {
+      defaultConf = new Configuration(false);
+      defaultConf.set("fs.defaultFS", "file:///");
+      defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+          "TestDAGAppMaster").makeQualified(localFs);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  
+  MockDAGAppMaster mockApp;    
+  MockContainerLauncher mockLauncher;
+  
+  int dagCount = 0;
+  
+  DAG createDAG(DataMovementType dmType) {
+    DAG dag = DAG.create("test-" + dagCount++);
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
+    Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 5);
+    Edge eAB = Edge.create(vA, vB, 
+    EdgeProperty.create(dmType, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, OutputDescriptor.create("O.class"),
+        InputDescriptor.create("I.class")));
+    
+    dag.addVertex(vA).addVertex(vB).addEdge(eAB);
+    return dag;
+  }
+  
+  @Test
+  public void testPreemptionWithoutSession() throws Exception {
+    System.out.println("TestPreemptionWithoutSession");
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
+    AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
+    MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null,
+        mockAppLauncherGoFlag);
+    tezClient.start();
+    
+    DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER));
+    // now the MockApp has been started. sync with it to get the launcher
+    syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
+
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    int vertexIndex = 0;
+    int upToTaskVersion = 3;
+    TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex);
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+
+    mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
+    mockLauncher.startScheduling(true);
+    
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+
+    for (int i=0; i<=upToTaskVersion; ++i) {
+      TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);      
+      TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
+      Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+    }
+    
+    tezClient.stop();
+  }
+  
+  @Test
+  public void testPreemptionWithSession() throws Exception {
+    System.out.println("TestPreemptionWithSession");
+    MockTezClient tezClient = createTezSession();
+    testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
+    testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
+    testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast");
+    testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast");
+    testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1");
+    testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1");
+    testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
+    testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
+    testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast");
+    testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast");
+    testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1");
+    testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1");
+    tezClient.stop();
+  }
+  
+  MockTezClient createTezSession() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
+    AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
+    MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null,
+        mockAppLauncherGoFlag);
+    tezClient.start();
+    syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
+    return tezClient;
+  }
+  
+  void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag, 
+      MockTezClient tezClient) throws Exception {
+    synchronized (mockAppLauncherGoFlag) {
+      while (!mockAppLauncherGoFlag.get()) {
+        mockAppLauncherGoFlag.wait();
+      }
+      mockApp = tezClient.getLocalClient().getMockApp();
+      mockLauncher = mockApp.getContainerLauncher();
+      mockLauncher.startScheduling(allowScheduling);
+      mockAppLauncherGoFlag.notify();
+    }     
+  }
+  
+  void testPreemptionSingle(MockTezClient tezClient, DAG dag, int vertexIndex, String info)
+      throws Exception {
+    testPreemptionJob(tezClient, dag, vertexIndex, 0, info + "-Single");
+  }
+
+  void testPreemptionMultiple(MockTezClient tezClient, DAG dag, int vertexIndex, String info)
+      throws Exception {
+    testPreemptionJob(tezClient, dag, vertexIndex, 3, info + "-Multiple");
+  }
+
+  void testPreemptionJob(MockTezClient tezClient, DAG dag, int vertexIndex,
+      int upToTaskVersion, String info) throws Exception {
+    System.out.println("TestPreemption - Running - " + info);
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
+    
+    mockLauncher.startScheduling(false); // turn off scheduling to block DAG before submitting it
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex);
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+
+    mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
+    mockLauncher.startScheduling(true);
+    
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    
+    for (int i=0; i<=upToTaskVersion; ++i) {
+      TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);      
+      TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
+      Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+    }
+    
+    System.out.println("TestPreemption - Done running - " + info);
+  }
+}