You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:49 UTC
[31/50] [abbrv] tez git commit: TEZ-2005. Define basic interface for
pluggable TaskScheduler. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index d4cf317..1e76dc9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -37,6 +37,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.math3.random.RandomDataGenerator;
@@ -59,10 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -80,17 +81,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
}
*/
-public class YarnTaskSchedulerService extends TaskSchedulerService
+public class YarnTaskSchedulerService extends TaskScheduler
implements AMRMClientAsync.CallbackHandler {
private static final Logger LOG = LoggerFactory.getLogger(YarnTaskSchedulerService.class);
final TezAMRMClientAsync<CookieContainerRequest> amRmClient;
- final TaskSchedulerAppCallback realAppClient;
- final TaskSchedulerAppCallback appClientDelegate;
final ContainerSignatureMatcher containerSignatureMatcher;
- ExecutorService appCallbackExecutor;
// Container Re-Use configuration
private boolean shouldReuseContainers;
@@ -131,7 +129,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
final String appHostName;
final int appHostPort;
final String appTrackingUrl;
- final AppContext appContext;
private AtomicBoolean hasUnregistered = new AtomicBoolean(false);
AtomicBoolean isStopped = new AtomicBoolean(false);
@@ -150,6 +147,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet();
RandomDataGenerator random = new RandomDataGenerator();
+ private final Configuration conf;
@VisibleForTesting
protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
@@ -213,51 +211,29 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
}
- public YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher,
- String appHostName,
- int appHostPort,
- String appTrackingUrl,
- AppContext appContext) {
- super(YarnTaskSchedulerService.class.getName());
- this.realAppClient = appClient;
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.containerSignatureMatcher = containerSignatureMatcher;
- this.appClientDelegate = createAppCallbackDelegate(appClient);
+ public YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
+ super(taskSchedulerContext);
+ this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this);
- this.appHostName = appHostName;
- this.appHostPort = appHostPort;
- this.appTrackingUrl = appTrackingUrl;
- this.appContext = appContext;
+ this.appHostName = taskSchedulerContext.getAppHostName();
+ this.appHostPort = taskSchedulerContext.getAppClientPort();
+ this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
+ this.conf = taskSchedulerContext.getInitialConfiguration();
}
@Private
@VisibleForTesting
- YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher,
- String appHostName,
- int appHostPort,
- String appTrackingUrl,
- TezAMRMClientAsync<CookieContainerRequest> client,
- AppContext appContext) {
- super(YarnTaskSchedulerService.class.getName());
- this.realAppClient = appClient;
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.containerSignatureMatcher = containerSignatureMatcher;
- this.appClientDelegate = createAppCallbackDelegate(appClient);
+ YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext,
+ TezAMRMClientAsync<CookieContainerRequest> client) {
+ super(taskSchedulerContext);
+ this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
this.amRmClient = client;
- this.appHostName = appHostName;
- this.appHostPort = appHostPort;
- this.appTrackingUrl = appTrackingUrl;
- this.appContext = appContext;
+ this.appHostName = taskSchedulerContext.getAppHostName();
+ this.appHostPort = taskSchedulerContext.getAppClientPort();
+ this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
+ this.conf = taskSchedulerContext.getInitialConfiguration();
}
- @VisibleForTesting
- ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
- }
-
@Override
public Resource getAvailableResources() {
return amRmClient.getAvailableResources();
@@ -269,12 +245,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
return amRmClient.getClusterNodeCount();
}
- TaskSchedulerAppCallback createAppCallbackDelegate(
- TaskSchedulerAppCallback realAppClient) {
- return new TaskSchedulerAppCallbackWrapper(realAppClient,
- appCallbackExecutor);
- }
-
@Override
public void setShouldUnregister() {
this.shouldUnregister.set(true);
@@ -287,8 +257,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// AbstractService methods
@Override
- public synchronized void serviceInit(Configuration conf) {
+ public synchronized void initialize() {
+ // TODO Post TEZ-2003. Make all of these final fields.
amRmClient.init(conf);
int heartbeatIntervalMax = conf.getInt(
TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
@@ -361,7 +332,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
@Override
- public void serviceStart() {
+ public void start() {
try {
RegisterApplicationMasterResponse response;
synchronized (this) {
@@ -371,7 +342,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
appTrackingUrl);
}
// upcall to app outside locks
- appClientDelegate.setApplicationRegistrationData(
+ getContext().setApplicationRegistrationData(
response.getMaximumResourceCapability(),
response.getApplicationACLs(),
response.getClientToAMTokenMasterKey());
@@ -387,7 +358,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
@Override
- public void serviceStop() throws InterruptedException {
+ public void shutdown() throws InterruptedException {
// upcall to app outside of locks
try {
delayedContainerManager.shutdown();
@@ -396,7 +367,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
synchronized (this) {
isStopped.set(true);
if (shouldUnregister.get()) {
- AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+ AppFinalStatus status = getContext().getFinalAppStatus();
LOG.info("Unregistering application from RM"
+ ", exitStatus=" + status.exitStatus
+ ", exitMessage=" + status.exitMessage
@@ -413,8 +384,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// operation and at the same time the callback operation might be trying
// to get our lock.
amRmClient.stop();
- appCallbackExecutor.shutdown();
- appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
} catch (YarnException e) {
LOG.error("Yarn Exception while unregistering ", e);
throw new TezUncheckedException(e);
@@ -478,7 +447,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// upcall to app must be outside locks
for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
- appClientDelegate.containerCompleted(entry.getKey(), entry.getValue());
+ getContext().containerCompleted(entry.getKey(), entry.getValue());
}
}
@@ -528,7 +497,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
private synchronized Map<CookieContainerRequest, Container>
assignNewlyAllocatedContainers(Iterable<Container> containers) {
- boolean amInCompletionState = appContext.isAMInCompletionState();
+ boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
@@ -550,7 +519,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
private synchronized Map<CookieContainerRequest, Container>
tryAssignReUsedContainers(Iterable<Container> containers) {
- boolean amInCompletionState = appContext.isAMInCompletionState();
+ boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED);
Map<CookieContainerRequest, Container> assignedContainers =
new HashMap<CookieContainerRequest, Container>();
@@ -590,7 +559,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
private synchronized Map<CookieContainerRequest, Container>
assignDelayedContainer(HeldContainer heldContainer) {
- DAGAppMasterState state = appContext.getAMState();
+ AMState state = getContext().getAMState();
boolean isNew = heldContainer.isNew();
if (LOG.isDebugEnabled()) {
@@ -606,13 +575,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
+ ", isNew=" + isNew);
}
- if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
+ if (state.equals(AMState.IDLE) || taskRequests.isEmpty()) {
// reset locality level on held container
// if sessionDelay defined, push back into delayed queue if not already
// done so
// Compute min held containers.
- if (appContext.isSession() && sessionNumMinHeldContainers > 0 &&
+ if (getContext().isSession() && sessionNumMinHeldContainers > 0 &&
sessionMinHeldContainers.isEmpty()) {
// session mode and need to hold onto containers and not done so already
determineMinHeldContainers();
@@ -626,7 +595,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
&& idleContainerTimeoutMin != -1)) {
// container idle timeout has expired or is a new unused container.
// new container is possibly a spurious race condition allocation.
- if (appContext.isSession()
+ if (getContext().isSession()
&& sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) {
// There are no outstanding requests. So its safe to hold new containers.
// We may have received more containers than necessary and some are unused
@@ -667,7 +636,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
heldContainer.getContainer(), currentTime
+ localitySchedulingDelay);
}
- } else if (state.equals(DAGAppMasterState.RUNNING)) {
+ } else if (state.equals(AMState.RUNNING_APP)) {
// clear min held containers since we need to allocate to tasks
if (!sessionMinHeldContainers.isEmpty()) {
// update the expire time of min held containers so that they are
@@ -806,12 +775,12 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
// Are there any pending requests at any priority?
// release if there are tasks or this is not a session
if (safeToRelease &&
- (!taskRequests.isEmpty() || !appContext.isSession())) {
+ (!taskRequests.isEmpty() || !getContext().isSession())) {
LOG.info("Releasing held container as either there are pending but "
+ " unmatched requests or this is not a session"
+ ", containerId=" + heldContainer.container.getId()
+ ", pendingTasks=" + taskRequests.size()
- + ", isSession=" + appContext.isSession()
+ + ", isSession=" + getContext().isSession()
+ ". isNew=" + isNew);
releaseUnassignedContainers(
Lists.newArrayList(heldContainer.container));
@@ -862,7 +831,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
return;
}
// upcall to app must be outside locks
- appClientDelegate.appShutdownRequested();
+ getContext().appShutdownRequested();
}
@Override
@@ -872,7 +841,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
}
// ignore bad nodes for now
// upcall to app must be outside locks
- appClientDelegate.nodesUpdated(updatedNodes);
+ getContext().nodesUpdated(updatedNodes);
}
@Override
@@ -894,7 +863,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
numHeartbeats++;
preemptIfNeeded();
- return appClientDelegate.getProgress();
+ return getContext().getProgress();
}
@Override
@@ -902,7 +871,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
if (isStopped.get()) {
return;
}
- appClientDelegate.onError(t);
+ getContext().onError(t);
}
@Override
@@ -1289,7 +1258,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
ContainerId cId = preemptedContainers[i];
if (cId != null) {
LOG.info("Preempting container: " + cId + " currently allocated to a task.");
- appClientDelegate.preemptContainer(cId);
+ getContext().preemptContainer(cId);
}
}
}
@@ -1422,7 +1391,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
Object assignedTask = containerAssignments.remove(containerId);
if (assignedTask != null) {
// A task was assigned to this container at some point. Inform the app.
- appClientDelegate.containerBeingReleased(containerId);
+ getContext().containerBeingReleased(containerId);
}
HeldContainer delayedContainer = heldContainers.remove(containerId);
if (delayedContainer != null) {
@@ -1626,7 +1595,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
private void informAppAboutAssignment(CookieContainerRequest assigned,
Container container) {
- appClientDelegate.taskAllocated(getTask(assigned),
+ getContext().taskAllocated(getTask(assigned),
assigned.getCookie().getAppCookie(), container);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 5cff766..aeacf84 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 938096d..fcb9eaf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.common.ContainerSignatureMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.AbstractService;
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
index 211c537..436f098 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.app.ContainerContext;
import com.google.common.base.Preconditions;
+import org.apache.tez.common.ContainerSignatureMatcher;
public class ContainerContextMatcher implements ContainerSignatureMatcher {
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
deleted file mode 100644
index 0f9c2d6..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm.container;
-
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-
-public interface ContainerSignatureMatcher {
- /**
- * Checks the compatibility between the specified container signatures.
- *
- * @return true if the first signature is a super set of the second
- * signature.
- */
- public boolean isSuperSet(Object cs1, Object cs2);
-
- /**
- * Checks if the container signatures match exactly
- * @return true if exact match
- */
- public boolean isExactMatch(Object cs1, Object cs2);
-
- /**
- * Gets additional resources specified in lr2, which are not present for lr1
- *
- * @param lr1
- * @param lr2
- * @return additional resources specified in lr2, which are not present for lr1
- */
- public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource> lr1,
- Map<String, LocalResource> lr2);
-
-
- /**
- * Do a union of 2 signatures
- * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2.
- * i.e. isSuperSet should not return false.
- * @param cs1 Signature 1 Original signature
- * @param cs2 Signature 2 New signature
- * @return Union of 2 signatures
- */
- public Object union(Object cs1, Object cs2);
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index e37ab4a..88f6066 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -68,16 +66,14 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.CapturingEventHandler;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
@@ -116,14 +112,13 @@ public class TestContainerReuse {
conf.setBoolean(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
conf.setBoolean(
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false);
conf.setBoolean(
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false);
conf.setLong(
TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 3000l);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
@@ -132,12 +127,6 @@ public class TestContainerReuse {
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(conf).when(appContext).getAMConf();
@@ -161,11 +150,11 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(conf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler =
- (TaskSchedulerWithDrainableAppCallback)
+ TaskSchedulerWithDrainableContext taskScheduler =
+ (TaskSchedulerWithDrainableContext)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback =
+ TaskSchedulerContextDrainable drainableAppCallback =
taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -251,8 +240,7 @@ public class TestContainerReuse {
}
}
assertTrue("containerHost2 was not released", exception == null);
- taskScheduler.stop();
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -267,7 +255,6 @@ public class TestContainerReuse {
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1000l);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
@@ -276,12 +263,6 @@ public class TestContainerReuse {
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -304,11 +285,11 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(conf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler =
- (TaskSchedulerWithDrainableAppCallback)
+ TaskSchedulerWithDrainableContext taskScheduler =
+ (TaskSchedulerWithDrainableContext)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback =
+ TaskSchedulerContextDrainable drainableAppCallback =
taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -366,8 +347,7 @@ public class TestContainerReuse {
eq(containerHost2.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
- taskScheduler.stop();
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -380,19 +360,12 @@ public class TestContainerReuse {
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -410,9 +383,9 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -504,7 +477,7 @@ public class TestContainerReuse {
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
eventHandler.reset();
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -522,19 +495,11 @@ public class TestContainerReuse {
tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS, "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__");
TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(tezConf);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -554,10 +519,10 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler =
- (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ TaskSchedulerWithDrainableContext taskScheduler =
+ (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -705,7 +670,7 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
eventHandler.reset();
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -721,20 +686,12 @@ public class TestContainerReuse {
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 1000l);
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 1000l);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -758,11 +715,11 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler =
- (TaskSchedulerWithDrainableAppCallback)
+ TaskSchedulerWithDrainableContext taskScheduler =
+ (TaskSchedulerWithDrainableContext)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback =
+ TaskSchedulerContextDrainable drainableAppCallback =
taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -836,7 +793,7 @@ public class TestContainerReuse {
verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -853,20 +810,12 @@ public class TestContainerReuse {
tezConf.setInt(
TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient =
spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -890,11 +839,11 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler =
- (TaskSchedulerWithDrainableAppCallback)
+ TaskSchedulerWithDrainableContext taskScheduler =
+ (TaskSchedulerWithDrainableContext)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -965,7 +914,7 @@ public class TestContainerReuse {
// container should not get released due to min held containers
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -979,19 +928,11 @@ public class TestContainerReuse {
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -1011,9 +952,9 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -1129,7 +1070,7 @@ public class TestContainerReuse {
assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
eventHandler.reset();
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -1143,19 +1084,11 @@ public class TestContainerReuse {
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
-
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -1177,9 +1110,9 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -1291,7 +1224,7 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
eventHandler.reset();
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
@@ -1305,19 +1238,12 @@ public class TestContainerReuse {
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
CapturingEventHandler eventHandler = new CapturingEventHandler();
TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
- String appUrl = "url";
- String appMsg = "success";
- AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
-
- doReturn(finalStatus).when(mockApp).getFinalAppStatus();
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
@@ -1326,7 +1252,7 @@ public class TestContainerReuse {
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
- doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+ doReturn(DAGAppMasterState.SUCCEEDED).when(appContext).getAMState();
doReturn(true).when(appContext).isAMInCompletionState();
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
@@ -1338,10 +1264,10 @@ public class TestContainerReuse {
taskSchedulerEventHandler.init(tezConf);
taskSchedulerEventHandler.start();
- TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
.getSpyTaskScheduler();
- TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -1369,7 +1295,7 @@ public class TestContainerReuse {
drainableAppCallback.drain();
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11),
any(Object.class), eq(container1));
- taskScheduler.close();
+ taskScheduler.shutdown();
taskSchedulerEventHandler.close();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 12390b2..2ada2f1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -18,12 +18,12 @@
package org.apache.tez.dag.app.rm;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.junit.Assert;
import org.junit.Test;
@@ -33,24 +33,12 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.AsyncDelegateRequestHandler;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.LocalContainerFactory;
import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.TaskRequest;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
public class TestLocalTaskScheduler {
- public AppContext createMockAppContext() {
-
- ApplicationId appId = ApplicationId.newInstance(2000, 1);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-
- AppContext appContext = mock(AppContext.class);
- doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-
- return appContext;
- }
@Test(timeout = 5000)
public void maxTasksAllocationsCannotBeExceeded() {
@@ -59,17 +47,24 @@ public class TestLocalTaskScheduler {
TezConfiguration tezConf = new TezConfiguration();
tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS);
- LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext(), 1000);
+ ApplicationId appId = ApplicationId.newInstance(2000, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ TaskSchedulerContext
+ mockContext = TestTaskSchedulerHelpers.setupMockTaskSchedulerContext("", 0, "", true,
+ appAttemptId, 1000l, null, new Configuration());
+
+ LocalContainerFactory containerFactory = new LocalContainerFactory(appAttemptId, 1000);
+
HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>();
PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
- TaskSchedulerAppCallback appClientDelegate = mock(TaskSchedulerAppCallback.class);
// Object under test
AsyncDelegateRequestHandler requestHandler =
new AsyncDelegateRequestHandler(taskRequestQueue,
containerFactory,
taskAllocations,
- appClientDelegate,
+ mockContext,
tezConf);
// Allocate up to max tasks
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index b555c62..c637f5f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -27,11 +27,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.TestLocalTaskSchedulerService.MockLocalTaskSchedulerSerivce.MockAsyncDelegateRequestHandler;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.junit.Assert;
import org.junit.Test;
@@ -82,14 +80,15 @@ public class TestLocalTaskSchedulerService {
* Normal flow of TaskAttempt
*/
@Test(timeout = 5000)
- public void testDeallocationBeforeAllocation() {
- AppContext appContext = mock(AppContext.class);
+ public void testDeallocationBeforeAllocation() throws InterruptedException {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
- doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
- MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
- (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
- taskSchedulerService.init(new Configuration());
+
+ TaskSchedulerContext mockContext = TestTaskSchedulerHelpers
+ .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration());
+
+ MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce(mockContext);
+ taskSchedulerService.initialize();
taskSchedulerService.start();
Task task = mock(Task.class);
@@ -103,21 +102,24 @@ public class TestLocalTaskSchedulerService {
assertEquals(1, requestHandler.deallocateCount);
// The corresponding AllocateTaskRequest will be removed, so won't been processed.
assertEquals(0, requestHandler.allocateCount);
- taskSchedulerService.stop();
+ taskSchedulerService.shutdown();
}
/**
* TaskAttempt Killed from START_WAIT
*/
@Test(timeout = 5000)
- public void testDeallocationAfterAllocation() {
- AppContext appContext = mock(AppContext.class);
+ public void testDeallocationAfterAllocation() throws InterruptedException {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
- doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
- MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
- (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
- taskSchedulerService.init(new Configuration());
+
+ TaskSchedulerContext mockContext = TestTaskSchedulerHelpers
+ .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration());
+
+ MockLocalTaskSchedulerSerivce taskSchedulerService =
+ new MockLocalTaskSchedulerSerivce(mockContext);
+
+ taskSchedulerService.initialize();
taskSchedulerService.start();
Task task = mock(Task.class);
@@ -130,33 +132,29 @@ public class TestLocalTaskSchedulerService {
requestHandler.drainRequest(2);
assertEquals(1, requestHandler.deallocateCount);
assertEquals(1, requestHandler.allocateCount);
- taskSchedulerService.stop();
+ taskSchedulerService.shutdown();
}
static class MockLocalTaskSchedulerSerivce extends LocalTaskSchedulerService {
private MockAsyncDelegateRequestHandler requestHandler;
- public MockLocalTaskSchedulerSerivce(TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher,
- String appHostName, int appHostPort, String appTrackingUrl,
- AppContext appContext) {
- super(appClient, containerSignatureMatcher, appHostName, appHostPort,
- appTrackingUrl, 10000l, appContext);
+ public MockLocalTaskSchedulerSerivce(TaskSchedulerContext appClient) {
+ super(appClient);
}
@Override
public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue,
- new LocalContainerFactory(appContext, customContainerAppId),
+ new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId),
taskAllocations,
- appClientDelegate,
+ getContext(),
conf);
return requestHandler;
}
@Override
- public void serviceStart() {
+ public void start() {
// don't start RequestHandler thread, control it in unit test
}
@@ -178,7 +176,7 @@ public class TestLocalTaskSchedulerService {
BlockingQueue<TaskRequest> taskRequestQueue,
LocalContainerFactory localContainerFactory,
HashMap<Object, Container> taskAllocations,
- TaskSchedulerAppCallback appClientDelegate, Configuration conf) {
+ TaskSchedulerContext appClientDelegate, Configuration conf) {
super(taskRequestQueue, localContainerFactory, taskAllocations,
appClientDelegate, conf);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 807e772..123a4d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.app.rm;
+import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.createCountingExecutingService;
+import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.setupMockTaskSchedulerContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -42,8 +44,11 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -59,23 +64,21 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -88,23 +91,39 @@ import com.google.common.collect.Sets;
public class TestTaskScheduler {
- RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher();
+ private ExecutorService contextCallbackExecutor;
@BeforeClass
public static void beforeClass() {
MockDNSToSwitchMapping.initializeMockRackResolver();
}
+ @Before
+ public void preTest() {
+ contextCallbackExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
+ .setDaemon(true)
+ .build());
+ }
+
+ @After
+ public void postTest() {
+ contextCallbackExecutor.shutdownNow();
+ }
+
+ private TaskSchedulerContextDrainable createDrainableContext(
+ TaskSchedulerContext taskSchedulerContext) {
+ TaskSchedulerContextImplWrapper wrapper =
+ new TaskSchedulerContextImplWrapper(taskSchedulerContext,
+ createCountingExecutingService(contextCallbackExecutor));
+ return new TaskSchedulerContextDrainable(wrapper);
+ }
+
@SuppressWarnings({ "unchecked" })
@Test(timeout=10000)
public void testTaskSchedulerNoReuse() throws Exception {
RackResolver.init(new YarnConfiguration());
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
- AppContext mockAppContext = mock(AppContext.class);
- when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(TezAMRMClientAsync.class);
@@ -112,18 +131,19 @@ public class TestTaskScheduler {
String appHost = "host";
int appPort = 0;
String appUrl = "url";
- TaskSchedulerWithDrainableAppCallback scheduler =
- new TaskSchedulerWithDrainableAppCallback(
- mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
- appUrl, mockRMClient, mockAppContext);
- TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
- .getDrainableAppCallback();
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
int interval = 100;
conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval);
- scheduler.init(conf);
+
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+ TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+ TaskSchedulerWithDrainableContext scheduler =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+ scheduler.initialize();
drainableAppCallback.drain();
verify(mockRMClient).init(conf);
verify(mockRMClient).setHeartbeatInterval(interval);
@@ -495,22 +515,18 @@ public class TestTaskScheduler {
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
- scheduler.stop();
+ scheduler.shutdown();
drainableAppCallback.drain();
verify(mockRMClient).
unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
- appMsg, appUrl);
+ appMsg, appUrl);
verify(mockRMClient).stop();
- scheduler.close();
}
@SuppressWarnings({ "unchecked" })
@Test(timeout=10000)
public void testTaskSchedulerWithReuse() throws Exception {
RackResolver.init(new YarnConfiguration());
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
- AppContext mockAppContext = mock(AppContext.class);
- when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(TezAMRMClientAsync.class);
@@ -518,12 +534,6 @@ public class TestTaskScheduler {
String appHost = "host";
int appPort = 0;
String appUrl = "url";
- TaskSchedulerWithDrainableAppCallback scheduler =
- new TaskSchedulerWithDrainableAppCallback(
- mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
- appUrl, mockRMClient, mockAppContext);
- final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
- .getDrainableAppCallback();
Configuration conf = new Configuration();
// to match all in the same pass
@@ -531,7 +541,15 @@ public class TestTaskScheduler {
// to release immediately after deallocate
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
- scheduler.init(conf);
+
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+ final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+ TaskSchedulerWithDrainableContext scheduler =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+
+ scheduler.initialize();
drainableAppCallback.drain();
RegisterApplicationMasterResponse mockRegResponse =
@@ -992,23 +1010,18 @@ public class TestTaskScheduler {
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
- scheduler.stop();
+ scheduler.shutdown();
drainableAppCallback.drain();
verify(mockRMClient).
unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
- appMsg, appUrl);
+ appMsg, appUrl);
verify(mockRMClient).stop();
- scheduler.close();
}
@SuppressWarnings("unchecked")
@Test (timeout=5000)
public void testTaskSchedulerDetermineMinHeldContainers() throws Exception {
RackResolver.init(new YarnConfiguration());
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
- AppContext mockAppContext = mock(AppContext.class);
- when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
- when(mockAppContext.isSession()).thenReturn(true);
TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(TezAMRMClientAsync.class);
@@ -1016,15 +1029,15 @@ public class TestTaskScheduler {
String appHost = "host";
int appPort = 0;
String appUrl = "url";
- TaskSchedulerWithDrainableAppCallback scheduler =
- new TaskSchedulerWithDrainableAppCallback(
- mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
- appUrl, mockRMClient, mockAppContext);
- TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
- .getDrainableAppCallback();
- Configuration conf = new Configuration();
- scheduler.init(conf);
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, true,
+ new Configuration());
+ final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+ TaskSchedulerWithDrainableContext scheduler =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+ scheduler.initialize();
RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
Resource mockMaxResource = mock(Resource.class);
Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
@@ -1176,17 +1189,13 @@ public class TestTaskScheduler {
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
- scheduler.stop();
- scheduler.close();
+ scheduler.shutdown();
}
@SuppressWarnings("unchecked")
@Test(timeout=5000)
public void testTaskSchedulerRandomReuseExpireTime() throws Exception {
RackResolver.init(new YarnConfiguration());
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
- AppContext mockAppContext = mock(AppContext.class);
- when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(TezAMRMClientAsync.class);
@@ -1194,25 +1203,31 @@ public class TestTaskScheduler {
String appHost = "host";
int appPort = 0;
String appUrl = "url";
- TaskSchedulerWithDrainableAppCallback scheduler1 =
- new TaskSchedulerWithDrainableAppCallback(
- mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
- appUrl, mockRMClient, mockAppContext);
- TaskSchedulerWithDrainableAppCallback scheduler2 =
- new TaskSchedulerWithDrainableAppCallback(
- mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort,
- appUrl, mockRMClient, mockAppContext);
long minTime = 1000l;
long maxTime = 100000l;
Configuration conf1 = new Configuration();
conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, minTime);
- scheduler1.init(conf1);
+
Configuration conf2 = new Configuration();
conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, maxTime);
- scheduler2.init(conf2);
+
+ TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf1);
+ TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf2);
+ final TaskSchedulerContextDrainable drainableAppCallback1 = createDrainableContext(mockApp1);
+ final TaskSchedulerContextDrainable drainableAppCallback2 = createDrainableContext(mockApp2);
+
+
+ TaskSchedulerWithDrainableContext scheduler1 =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback1, mockRMClient);
+ TaskSchedulerWithDrainableContext scheduler2 =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback2, mockRMClient);
+
+ scheduler1.initialize();
+ scheduler2.initialize();
+
RegisterApplicationMasterResponse mockRegResponse =
mock(RegisterApplicationMasterResponse.class);
@@ -1250,20 +1265,16 @@ public class TestTaskScheduler {
String appMsg = "success";
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
- when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
- scheduler1.stop();
- scheduler1.close();
- scheduler2.stop();
- scheduler2.close();
+ when(mockApp1.getFinalAppStatus()).thenReturn(finalStatus);
+ when(mockApp2.getFinalAppStatus()).thenReturn(finalStatus);
+ scheduler1.shutdown();
+ scheduler2.shutdown();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test (timeout=5000)
public void testTaskSchedulerPreemption() throws Exception {
RackResolver.init(new YarnConfiguration());
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
- AppContext mockAppContext = mock(AppContext.class);
- when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(TezAMRMClientAsync.class);
@@ -1271,16 +1282,18 @@ public class TestTaskScheduler {
String appHost = "host";
int appPort = 0;
String appUrl = "url";
- final TaskSchedulerWithDrainableAppCallback scheduler =
- new TaskSchedulerWithDrainableAppCallback(
- mockApp, new PreemptionMatcher(), appHost, appPort,
- appUrl, mockRMClient, mockAppContext);
- TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler
- .getDrainableAppCallback();
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
- scheduler.init(conf);
+
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
+ null, null, new PreemptionMatcher(), conf);
+ final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+
+ final TaskSchedulerWithDrainableContext scheduler =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+ scheduler.initialize();
RegisterApplicationMasterResponse mockRegResponse =
mock(RegisterApplicationMasterResponse.class);
@@ -1530,7 +1543,7 @@ public class TestTaskScheduler {
scheduler.getProgress();
scheduler.getProgress();
scheduler.getProgress();
- verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
+ verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) any());
scheduler.getProgress();
drainableAppCallback.drain();
// Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable tasks
@@ -1540,9 +1553,8 @@ public class TestTaskScheduler {
AppFinalStatus finalStatus =
new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
- scheduler.stop();
+ scheduler.shutdown();
drainableAppCallback.drain();
- scheduler.close();
}
@SuppressWarnings("unchecked")
@@ -1550,22 +1562,19 @@ public class TestTaskScheduler {
public void testLocalityMatching() throws Exception {
RackResolver.init(new Configuration());
- TaskSchedulerAppCallback appClient = mock(TaskSchedulerAppCallback.class);
TezAMRMClientAsync<CookieContainerRequest> amrmClient =
mock(TezAMRMClientAsync.class);
- AppContext mockAppContext = mock(AppContext.class);
- when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
-
- TaskSchedulerWithDrainableAppCallback taskScheduler =
- new TaskSchedulerWithDrainableAppCallback(
- appClient, new AlwaysMatchesContainerMatcher(), "host", 0, "",
- amrmClient, mockAppContext);
- TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler
- .getDrainableAppCallback();
-
+
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
- taskScheduler.init(conf);
+
+ TaskSchedulerContext appClient = setupMockTaskSchedulerContext("host", 0, "", conf);
+ final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(appClient);
+
+ TaskSchedulerWithDrainableContext taskScheduler =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback, amrmClient);
+
+ taskScheduler.initialize();
RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
Resource mockMaxResource = mock(Resource.class);
@@ -1693,7 +1702,7 @@ public class TestTaskScheduler {
AppFinalStatus finalStatus = new AppFinalStatus(
FinalApplicationStatus.SUCCEEDED, "", "");
when(appClient.getFinalAppStatus()).thenReturn(finalStatus);
- taskScheduler.close();
+ taskScheduler.shutdown();
}
@Test (timeout=5000)
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 005692e..3ea0446 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -47,11 +47,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
@@ -61,10 +63,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -96,6 +98,7 @@ public class TestTaskSchedulerEventHandler {
protected void instantiateScheduelrs(String host, int port, String trackingUrl,
AppContext appContext) {
taskSchedulers[0] = mockTaskScheduler;
+ taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]);
}
@Override
@@ -113,7 +116,7 @@ public class TestTaskSchedulerEventHandler {
TestEventHandler mockEventHandler;
ContainerSignatureMatcher mockSigMatcher;
MockTaskSchedulerEventHandler schedulerHandler;
- TaskSchedulerService mockTaskScheduler;
+ TaskScheduler mockTaskScheduler;
AMContainerMap mockAMContainerMap;
WebUIService mockWebUIService;
@@ -124,7 +127,7 @@ public class TestTaskSchedulerEventHandler {
mockClientService = mock(DAGClientServer.class);
mockEventHandler = new TestEventHandler();
mockSigMatcher = mock(ContainerSignatureMatcher.class);
- mockTaskScheduler = mock(TaskSchedulerService.class);
+ mockTaskScheduler = mock(TaskScheduler.class);
mockAMContainerMap = mock(AMContainerMap.class);
mockWebUIService = mock(WebUIService.class);
when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 04610ab..966c95a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -40,9 +40,13 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -54,13 +58,12 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
class TestTaskSchedulerHelpers {
@@ -134,12 +137,19 @@ class TestTaskSchedulerHelpers {
@Override
public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
- taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(new TaskSchedulerAppCallbackImpl(this, 0),
- containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
- appContext);
- }
-
- public TaskSchedulerService getSpyTaskScheduler() {
+ TaskSchedulerContext taskSchedulerContext =
+ new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port,
+ getConfig());
+ TaskSchedulerContextImplWrapper wrapper =
+ new TaskSchedulerContextImplWrapper(taskSchedulerContext,
+ new CountingExecutorService(appCallbackExecutor));
+ TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper);
+ taskSchedulers[0] =
+ new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync);
+ taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]);
+ }
+
+ public TaskScheduler getSpyTaskScheduler() {
return taskSchedulers[0];
}
@@ -147,8 +157,8 @@ class TestTaskSchedulerHelpers {
public void serviceStart() {
instantiateScheduelrs("host", 0, "", appContext);
// Init the service so that reuse configuration is picked up.
- ((AbstractService)taskSchedulers[0]).init(getConfig());
- ((AbstractService)taskSchedulers[0]).start();
+ ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig());
+ ((AbstractService)taskSchedulerServiceWrappers[0]).start();
taskSchedulers[0] = spy(taskSchedulers[0]);
}
@@ -188,61 +198,31 @@ class TestTaskSchedulerHelpers {
}
}
- static class TaskSchedulerWithDrainableAppCallback extends YarnTaskSchedulerService {
+ static class TaskSchedulerWithDrainableContext extends YarnTaskSchedulerService {
- private TaskSchedulerAppCallbackDrainable drainableAppCallback;
- public TaskSchedulerWithDrainableAppCallback(
- TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher,
- String appHostName, int appHostPort, String appTrackingUrl,
- AppContext appContext) {
- super(appClient, containerSignatureMatcher, appHostName, appHostPort,
- appTrackingUrl, appContext);
+ public TaskSchedulerWithDrainableContext(
+ TaskSchedulerContextDrainable appClient,
+ TezAMRMClientAsync<CookieContainerRequest> client) {
+ super(appClient, client);
shouldUnregister.set(true);
}
- public TaskSchedulerWithDrainableAppCallback(
- TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher,
- String appHostName, int appHostPort, String appTrackingUrl,
- TezAMRMClientAsync<CookieContainerRequest> client,
- AppContext appContext) {
- super(appClient, containerSignatureMatcher, appHostName, appHostPort,
- appTrackingUrl, client, appContext);
- shouldUnregister.set(true);
- }
-
- @Override
- TaskSchedulerAppCallback createAppCallbackDelegate(
- TaskSchedulerAppCallback realAppClient) {
- drainableAppCallback = new TaskSchedulerAppCallbackDrainable(
- new TaskSchedulerAppCallbackWrapper(realAppClient,
- appCallbackExecutor));
- return drainableAppCallback;
- }
-
- @Override
- ExecutorService createAppCallbackExecutorService() {
- ExecutorService real = super.createAppCallbackExecutorService();
- return new CountingExecutorService(real);
- }
-
- public TaskSchedulerAppCallbackDrainable getDrainableAppCallback() {
- return drainableAppCallback;
+ public TaskSchedulerContextDrainable getDrainableAppCallback() {
+ return (TaskSchedulerContextDrainable)getContext();
}
}
@SuppressWarnings("rawtypes")
- static class TaskSchedulerAppCallbackDrainable implements TaskSchedulerAppCallback {
+ static class TaskSchedulerContextDrainable implements TaskSchedulerContext {
int completedEvents;
int invocations;
- private TaskSchedulerAppCallback real;
+ private TaskSchedulerContext real;
private CountingExecutorService countingExecutorService;
final AtomicInteger count = new AtomicInteger(0);
- public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) {
- countingExecutorService = (CountingExecutorService) real.executorService;
+ public TaskSchedulerContextDrainable(TaskSchedulerContextImplWrapper real) {
+ countingExecutorService = (CountingExecutorService) real.getExecutorService();
this.real = real;
}
@@ -303,6 +283,53 @@ class TestTaskSchedulerHelpers {
return real.getFinalAppStatus();
}
+ // Not incrementing invocations for methods which to not obtain locks,
+ // and do not go via the executor service.
+ @Override
+ public Configuration getInitialConfiguration() {
+ return real.getInitialConfiguration();
+ }
+
+ @Override
+ public String getAppTrackingUrl() {
+ return real.getAppTrackingUrl();
+ }
+
+ @Override
+ public long getCustomClusterIdentifier() {
+ return real.getCustomClusterIdentifier();
+ }
+
+ @Override
+ public ContainerSignatureMatcher getContainerSignatureMatcher() {
+ return real.getContainerSignatureMatcher();
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return real.getApplicationAttemptId();
+ }
+
+ @Override
+ public String getAppHostName() {
+ return real.getAppHostName();
+ }
+
+ @Override
+ public int getAppClientPort() {
+ return real.getAppClientPort();
+ }
+
+ @Override
+ public boolean isSession() {
+ return real.isSession();
+ }
+
+ @Override
+ public AMState getAMState() {
+ return real.getAMState();
+ }
+
@Override
public void preemptContainer(ContainerId cId) {
invocations++;
@@ -384,7 +411,11 @@ class TestTaskSchedulerHelpers {
}
}
}
-
+
+ static CountingExecutorService createCountingExecutingService(ExecutorService rawExecutor) {
+ return new CountingExecutorService(rawExecutor);
+ }
+
@SuppressWarnings({"rawtypes", "unchecked"})
private static class CountingExecutorService implements ExecutorService {
@@ -464,7 +495,50 @@ class TestTaskSchedulerHelpers {
throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException("Not expected to be used");
}
-
+ }
+
+ static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort,
+ String appUrl, Configuration conf) {
+ return setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, conf);
+ }
+
+ static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort,
+ String appUrl, boolean isSession,
+ Configuration conf) {
+ return setupMockTaskSchedulerContext(appHost, appPort, appUrl, isSession, null, null, null,
+ conf);
+ }
+
+ static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort,
+ String appUrl, boolean isSession,
+ ApplicationAttemptId appAttemptId,
+ Long customAppIdentifier,
+ ContainerSignatureMatcher containerSignatureMatcher,
+ Configuration conf) {
+
+ TaskSchedulerContext mockContext = mock(TaskSchedulerContext.class);
+ when(mockContext.getAppHostName()).thenReturn(appHost);
+ when(mockContext.getAppClientPort()).thenReturn(appPort);
+ when(mockContext.getAppTrackingUrl()).thenReturn(appUrl);
+
+ when(mockContext.getAMState()).thenReturn(TaskSchedulerContext.AMState.RUNNING_APP);
+ when(mockContext.getInitialConfiguration()).thenReturn(conf);
+ when(mockContext.isSession()).thenReturn(isSession);
+ if (containerSignatureMatcher != null) {
+ when(mockContext.getContainerSignatureMatcher())
+ .thenReturn(containerSignatureMatcher);
+ } else {
+ when(mockContext.getContainerSignatureMatcher())
+ .thenReturn(new AlwaysMatchesContainerMatcher());
+ }
+ if (appAttemptId != null) {
+ when(mockContext.getApplicationAttemptId()).thenReturn(appAttemptId);
+ }
+ if (customAppIdentifier != null) {
+ when(mockContext.getCustomClusterIdentifier()).thenReturn(customAppIdentifier);
+ }
+
+ return mockContext;
}
}