You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:21 UTC
[15/50] [abbrv] git commit: TEZ-1569. Add tests for preemption (bikas)
TEZ-1569. Add tests for preemption (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5e5683ab
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5e5683ab
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5e5683ab
Branch: refs/heads/branch-0.5
Commit: 5e5683ab5e89f1fd88883ed40574c8fb04316dd1
Parents: 8e382b3
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 12 14:15:56 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 12 14:15:56 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../java/org/apache/tez/client/TezClient.java | 2 +-
.../java/org/apache/tez/client/LocalClient.java | 15 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 15 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 6 +
.../apache/tez/dag/app/MockDAGAppMaster.java | 245 +++++++++++++++++++
.../org/apache/tez/dag/app/MockLocalClient.java | 48 ++++
.../org/apache/tez/dag/app/MockTezClient.java | 48 ++++
.../org/apache/tez/dag/app/TestPreemption.java | 203 +++++++++++++++
9 files changed, 576 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 87729b3..73a3671 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@ ALL CHANGES:
TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized.
TEZ-1575. MRRSleepJob does not pick MR settings for container size and java opts.
TEZ-1578. Remove TeraSort from Tez codebase.
+ TEZ-1569. Add tests for preemption
Release 0.5.1: Unreleased
@@ -33,7 +34,7 @@ ALL CHANGES
TEZ-1534. Make client side configs available to AM and tasks.
TEZ-1574. Support additional formats for the tez deployed archive
-Release 0.5.0: Unreleased
+Release 0.5.0: 2014-09-03
INCOMPATIBLE CHANGES
TEZ-1038. Move TaskLocationHint outside of VertexLocationHint.
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 13ca2dc..77ab20c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -123,7 +123,7 @@ public class TezClient {
}
@Private
- TezClient(String name, TezConfiguration tezConf, boolean isSession,
+ protected TezClient(String name, TezConfiguration tezConf, boolean isSession,
@Nullable Map<String, LocalResource> localResources,
@Nullable Credentials credentials) {
this.clientName = name;
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index 0b615fa..0a95cf0 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +55,8 @@ import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.utils.EnvironmentUpdateUtils;
+import com.google.common.annotations.VisibleForTesting;
+
public class LocalClient extends FrameworkClient {
public static final Logger LOG = Logger.getLogger(LocalClient.class);
@@ -286,8 +289,7 @@ public class LocalClient extends FrameworkClient {
int nmHttpPort = YarnConfiguration.DEFAULT_NM_WEBAPP_PORT;
long appSubmitTime = System.currentTimeMillis();
- dagAppMaster =
- new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
+ dagAppMaster = createDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
new SystemClock(),
appSubmitTime, isSession, userDir.toUri().getPath());
clientHandler = new DAGClientHandler(dagAppMaster);
@@ -305,4 +307,13 @@ public class LocalClient extends FrameworkClient {
return thread;
}
+
+ // this can be overridden by test code to create a mock app
+ @VisibleForTesting
+ protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
+ ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
+ Clock clock, long appSubmitTime, boolean isSession, String userDir) {
+ return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
+ new SystemClock(), appSubmitTime, isSession, userDir);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 9cd716a..ea0ab3b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -156,12 +156,12 @@ import com.google.common.base.Function;
import com.google.common.collect.Maps;
/**
- * The Map-Reduce Application Master.
+ * The Tez DAG Application Master.
* The state machine is encapsulated in the implementation of Job interface.
* All state changes happens via Job interface. Each event
* results in a Finite State Transition in Job.
*
- * MR AppMaster is the composition of loosely coupled services. The services
+ * Tez DAG AppMaster is the composition of loosely coupled services. The services
* interact with each other via events. The components resembles the
* Actors model. The component acts on received event and send out the
* events to other components.
@@ -443,6 +443,11 @@ public class DAGAppMaster extends AbstractService {
System.exit(0);
}
}
+
+ @VisibleForTesting
+ protected TaskSchedulerEventHandler getTaskSchedulerEventHandler() {
+ return taskSchedulerEventHandler;
+ }
private synchronized void handle(DAGAppMasterEvent event) {
switch (event.getType()) {
@@ -1434,14 +1439,14 @@ public class DAGAppMaster extends AbstractService {
}
return null;
}
-
+
@Override
public synchronized void serviceStart() throws Exception {
//start all the components
startServices();
super.serviceStart();
-
+
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("DAGAppMaster");
@@ -1882,7 +1887,7 @@ public class DAGAppMaster extends AbstractService {
UserGroupInformation.setConfiguration(conf);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
-
+
appMaster.appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMaster.appMasterUgi.addCredentials(credentials);
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index daaa81b..23f9096 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -105,6 +105,7 @@ import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.dag.utils.RelocalizationUtils;
@@ -690,6 +691,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
return vertex.getVertexStatus(statusOptions);
}
+
+ public TaskAttemptImpl getTaskAttempt(TezTaskAttemptID taId) {
+ return (TaskAttemptImpl) getVertex(taId.getTaskID().getVertexID()).getTask(taId.getTaskID())
+ .getAttempt(taId);
+ }
protected void initializeVerticesAndStart() {
for (Vertex v : vertices.values()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
new file mode 100644
index 0000000..9fe9c4d
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -0,0 +1,245 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.launcher.ContainerLauncher;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unchecked")
+public class MockDAGAppMaster extends DAGAppMaster {
+
+ MockContainerLauncher containerLauncher;
+
+ // mock container launcher does not launch real tasks.
+ // Upon, launch of a container is simulates the container asking for tasks
+ // Upon receiving a task it simulates completion of the tasks
+ // It can be used to preempt the container for a given task
+ public class MockContainerLauncher extends AbstractService implements ContainerLauncher, Runnable {
+
+ BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>();
+ Thread eventHandlingThread;
+
+ Map<ContainerId, ContainerData> containers = Maps.newConcurrentMap();
+ TaskAttemptListenerImpTezDag taListener;
+
+ AtomicBoolean startScheduling = new AtomicBoolean(true);
+ AtomicBoolean goFlag;
+
+ Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
+
+ public MockContainerLauncher(AtomicBoolean goFlag) {
+ super("MockContainerLauncher");
+ this.goFlag = goFlag;
+ }
+
+ public class ContainerData {
+ ContainerId cId;
+ TezTaskAttemptID taId;
+ String vName;
+ boolean completed;
+
+ public ContainerData(ContainerId cId) {
+ this.cId = cId;
+ }
+
+ void clear() {
+ taId = null;
+ vName = null;
+ completed = false;
+ }
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
+ eventHandlingThread = new Thread(this);
+ eventHandlingThread.start();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (eventHandlingThread != null) {
+ eventHandlingThread.interrupt();
+ eventHandlingThread.join(2000l);
+ }
+ }
+
+ @Override
+ public void handle(NMCommunicatorEvent event) {
+ switch (event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ launch((NMCommunicatorLaunchRequestEvent) event);
+ break;
+ case CONTAINER_STOP_REQUEST:
+ stop((NMCommunicatorStopRequestEvent)event);
+ break;
+ }
+ }
+
+
+ void waitToGo() {
+ synchronized (goFlag) {
+ goFlag.set(true);
+ goFlag.notify();
+ try {
+ goFlag.wait();
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+ }
+
+ public void startScheduling(boolean value) {
+ startScheduling.set(value);
+ }
+
+ public Map<ContainerId, ContainerData> getContainers() {
+ return containers;
+ }
+
+ public void preemptContainerForTask(TezTaskID tId, int uptoVersion) {
+ preemptedTasks.put(tId, uptoVersion);
+ }
+
+ public void preemptContainer(ContainerData cData) {
+ getTaskSchedulerEventHandler().containerCompleted(null,
+ ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED));
+ cData.clear();
+ }
+
+ void stop(NMCommunicatorStopRequestEvent event) {
+ // remove from simulated container list
+ containers.remove(event.getContainerId());
+ getContext().getEventHandler().handle(
+ new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
+ }
+
+ void launch(NMCommunicatorLaunchRequestEvent event) {
+ // launch container by putting it in simulated container list
+ containers.put(event.getContainerId(), new ContainerData(event.getContainerId()));
+ getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId()));
+ }
+
+ @Override
+ public void run() {
+ // wait for test to sync with us and get a reference to us. Go when sync is done
+ waitToGo();
+ while(true) {
+ if (!startScheduling.get()) { // schedule when asked to do so by the test code
+ continue;
+ }
+ for (Map.Entry<ContainerId, ContainerData> entry : containers.entrySet()) {
+ ContainerData cData = entry.getValue();
+ ContainerId cId = entry.getKey();
+ if (cData.taId == null) {
+ // if container is not assigned a task, ask for a task
+ try {
+ ContainerTask cTask = taListener.getTask(new ContainerContext(cId.toString()));
+ if (cTask == null) {
+ continue;
+ }
+ if (cTask.shouldDie()) {
+ containers.remove(cId);
+ } else {
+ cData.taId = cTask.getTaskSpec().getTaskAttemptID();
+ cData.vName = cTask.getTaskSpec().getVertexName();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else if (!cData.completed) {
+ // container is assigned a task and task is not completed
+ // complete the task or preempt the task
+ Integer version = preemptedTasks.get(cData.taId.getTaskID());
+ if (version != null && cData.taId.getId() <= version.intValue()) {
+ preemptContainer(cData);
+ } else {
+ // send a done notification
+ TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
+ cData.completed = true;
+ getContext().getEventHandler().handle(
+ new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
+ new TaskAttemptCompletedEvent(), new EventMetaData(
+ EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId)))));
+ cData.clear();
+ }
+ }
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted in mock container launcher thread");
+ break;
+ }
+ }
+ }
+
+ }
+
+ public MockDAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId,
+ String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime,
+ boolean isSession, String workingDirectory, AtomicBoolean launcherGoFlag) {
+ super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
+ isSession, workingDirectory);
+ containerLauncher = new MockContainerLauncher(launcherGoFlag);
+ }
+
+ // use mock container launcher for tests
+ @Override
+ protected ContainerLauncher createContainerLauncher(final AppContext context)
+ throws UnknownHostException {
+ return containerLauncher;
+ }
+
+ public MockContainerLauncher getContainerLauncher() {
+ return containerLauncher;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
new file mode 100644
index 0000000..7e408e1
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.client.LocalClient;
+
+public class MockLocalClient extends LocalClient {
+ MockDAGAppMaster mockApp;
+ AtomicBoolean mockAppLauncherGoFlag;
+
+ public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag) {
+ this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
+ }
+
+ protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId,
+ ContainerId cId, String currentHost, int nmPort, int nmHttpPort,
+ Clock clock, long appSubmitTime, boolean isSession, String userDir) {
+ mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
+ new SystemClock(), appSubmitTime, isSession, userDir, mockAppLauncherGoFlag);
+ return mockApp;
+ }
+
+ public MockDAGAppMaster getMockApp() {
+ return mockApp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
new file mode 100644
index 0000000..617415e
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.FrameworkClient;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public class MockTezClient extends TezClient {
+ MockLocalClient client;
+
+ MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
+ Map<String, LocalResource> localResources, Credentials credentials,
+ AtomicBoolean mockAppLauncherGoFlag) {
+ super(name, tezConf, isSession, localResources, credentials);
+ this.client = new MockLocalClient(mockAppLauncherGoFlag);
+ }
+
+ protected FrameworkClient createFrameworkClient() {
+ return client;
+ }
+
+ public MockLocalClient getLocalClient() {
+ return client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/5e5683ab/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
new file mode 100644
index 0000000..c7aacd4
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -0,0 +1,203 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestPreemption {
+
+ static Configuration defaultConf;
+ static FileSystem localFs;
+ static Path workDir;
+
+ static {
+ try {
+ defaultConf = new Configuration(false);
+ defaultConf.set("fs.defaultFS", "file:///");
+ defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ localFs = FileSystem.getLocal(defaultConf);
+ workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+ "TestDAGAppMaster").makeQualified(localFs);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ MockDAGAppMaster mockApp;
+ MockContainerLauncher mockLauncher;
+
+ int dagCount = 0;
+
+ DAG createDAG(DataMovementType dmType) {
+ DAG dag = DAG.create("test-" + dagCount++);
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
+ Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 5);
+ Edge eAB = Edge.create(vA, vB,
+ EdgeProperty.create(dmType, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("O.class"),
+ InputDescriptor.create("I.class")));
+
+ dag.addVertex(vA).addVertex(vB).addEdge(eAB);
+ return dag;
+ }
+
+ @Test
+ public void testPreemptionWithoutSession() throws Exception {
+ System.out.println("TestPreemptionWithoutSession");
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+ tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
+ AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
+ MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null,
+ mockAppLauncherGoFlag);
+ tezClient.start();
+
+ DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER));
+ // now the MockApp has been started. sync with it to get the launcher
+ syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
+
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ int vertexIndex = 0;
+ int upToTaskVersion = 3;
+ TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex);
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+
+ mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
+ mockLauncher.startScheduling(true);
+
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+
+ for (int i=0; i<=upToTaskVersion; ++i) {
+ TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);
+ TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
+ Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+ }
+
+ tezClient.stop();
+ }
+
+ @Test
+ public void testPreemptionWithSession() throws Exception {
+ System.out.println("TestPreemptionWithSession");
+ MockTezClient tezClient = createTezSession();
+ testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
+ testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 0, "Scatter-Gather");
+ testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast");
+ testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 0, "Broadcast");
+ testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1");
+ testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 0, "1-1");
+ testPreemptionSingle(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
+ testPreemptionMultiple(tezClient, createDAG(DataMovementType.SCATTER_GATHER), 1, "Scatter-Gather");
+ testPreemptionSingle(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast");
+ testPreemptionMultiple(tezClient, createDAG(DataMovementType.BROADCAST), 1, "Broadcast");
+ testPreemptionSingle(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1");
+ testPreemptionMultiple(tezClient, createDAG(DataMovementType.ONE_TO_ONE), 1, "1-1");
+ tezClient.stop();
+ }
+
+ MockTezClient createTezSession() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+ tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
+ AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
+ MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, true, null, null,
+ mockAppLauncherGoFlag);
+ tezClient.start();
+ syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
+ return tezClient;
+ }
+
+ void syncWithMockAppLauncher(boolean allowScheduling, AtomicBoolean mockAppLauncherGoFlag,
+ MockTezClient tezClient) throws Exception {
+ synchronized (mockAppLauncherGoFlag) {
+ while (!mockAppLauncherGoFlag.get()) {
+ mockAppLauncherGoFlag.wait();
+ }
+ mockApp = tezClient.getLocalClient().getMockApp();
+ mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(allowScheduling);
+ mockAppLauncherGoFlag.notify();
+ }
+ }
+
+ void testPreemptionSingle(MockTezClient tezClient, DAG dag, int vertexIndex, String info)
+ throws Exception {
+ testPreemptionJob(tezClient, dag, vertexIndex, 0, info + "-Single");
+ }
+
+ void testPreemptionMultiple(MockTezClient tezClient, DAG dag, int vertexIndex, String info)
+ throws Exception {
+ testPreemptionJob(tezClient, dag, vertexIndex, 3, info + "-Multiple");
+ }
+
+ void testPreemptionJob(MockTezClient tezClient, DAG dag, int vertexIndex,
+ int upToTaskVersion, String info) throws Exception {
+ System.out.println("TestPreemption - Running - " + info);
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+ tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
+
+ mockLauncher.startScheduling(false); // turn off scheduling to block DAG before submitting it
+ DAGClient dagClient = tezClient.submitDAG(dag);
+
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), vertexIndex);
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+
+ mockLauncher.preemptContainerForTask(taId.getTaskID(), upToTaskVersion);
+ mockLauncher.startScheduling(true);
+
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+
+ for (int i=0; i<=upToTaskVersion; ++i) {
+ TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);
+ TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
+ Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+ }
+
+ System.out.println("TestPreemption - Done running - " + info);
+ }
+}