You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:50 UTC
[32/50] [abbrv] tez git commit: TEZ-2005. Define basic interface for
pluggable TaskScheduler. (sseth)
TEZ-2005. Define basic interface for pluggable TaskScheduler. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/82c24ac0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/82c24ac0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/82c24ac0
Branch: refs/heads/TEZ-2003
Commit: 82c24ac0bcfc383fce9e069272fda5aa84e180c2
Parents: af1cc72
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 22 22:25:01 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:45 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../tez/common/ContainerSignatureMatcher.java | 64 ++++
.../tez/common/ServicePluginLifecycle.java | 39 ++
.../tez/serviceplugins/api/TaskScheduler.java | 85 +++++
.../api/TaskSchedulerContext.java | 114 ++++++
.../org/apache/tez/common/TezUtilsInternal.java | 1 +
.../tez/dag/api/TaskCommunicatorInterface.java | 18 -
.../org/apache/tez/dag/app/DAGAppMaster.java | 3 +-
.../ServicePluginLifecycleAbstractService.java | 52 +++
.../dag/app/rm/LocalTaskSchedulerService.java | 77 ++--
.../app/rm/TaskSchedulerAppCallbackImpl.java | 89 -----
.../app/rm/TaskSchedulerAppCallbackWrapper.java | 307 ----------------
.../dag/app/rm/TaskSchedulerContextImpl.java | 174 +++++++++
.../app/rm/TaskSchedulerContextImplWrapper.java | 368 +++++++++++++++++++
.../dag/app/rm/TaskSchedulerEventHandler.java | 81 ++--
.../tez/dag/app/rm/TaskSchedulerService.java | 111 ------
.../dag/app/rm/YarnTaskSchedulerService.java | 121 +++---
.../dag/app/rm/container/AMContainerImpl.java | 1 +
.../dag/app/rm/container/AMContainerMap.java | 1 +
.../rm/container/ContainerContextMatcher.java | 1 +
.../rm/container/ContainerSignatureMatcher.java | 60 ---
.../tez/dag/app/rm/TestContainerReuse.java | 148 ++------
.../tez/dag/app/rm/TestLocalTaskScheduler.java | 29 +-
.../app/rm/TestLocalTaskSchedulerService.java | 52 ++-
.../tez/dag/app/rm/TestTaskScheduler.java | 201 +++++-----
.../app/rm/TestTaskSchedulerEventHandler.java | 9 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 186 +++++++---
.../rm/TezTestServiceTaskSchedulerService.java | 66 +---
28 files changed, 1357 insertions(+), 1102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 88dd0c7..a51669d 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -35,5 +35,6 @@ ALL CHANGES:
TEZ-2621. rebase 07/14
TEZ-2124. Change Node tracking to work per external container source.
TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
+ TEZ-2005. Define basic interface for pluggable TaskScheduler.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java b/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java
new file mode 100644
index 0000000..c0a1245
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java
@@ -0,0 +1,64 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ContainerSignatureMatcher {
+ /**
+ * Checks the compatibility between the specified container signatures.
+ *
+ * @return true if the first signature is a super set of the second
+ * signature.
+ */
+ public boolean isSuperSet(Object cs1, Object cs2);
+
+ /**
+ * Checks if the container signatures match exactly
+ * @return true if exact match
+ */
+ public boolean isExactMatch(Object cs1, Object cs2);
+
+ /**
+ * Gets additional resources specified in lr2, which are not present for lr1
+ *
+ * @param lr1
+ * @param lr2
+ * @return additional resources specified in lr2, which are not present for lr1
+ */
+ public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource> lr1,
+ Map<String, LocalResource> lr2);
+
+
+ /**
+ * Do a union of 2 signatures
+ * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2.
+ * i.e. isSuperSet should not return false.
+ * @param cs1 Signature 1 Original signature
+ * @param cs2 Signature 2 New signature
+ * @return Union of 2 signatures
+ */
+ public Object union(Object cs1, Object cs2);
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
new file mode 100644
index 0000000..2eaa7be
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ServicePluginLifecycle {
+
+ /**
+ * Perform any additional initialization which may be required beyond the constructor.
+ */
+ void initialize() throws Exception;
+
+ /**
+ * Start the service. This will be invoked after initialization.
+ */
+ void start() throws Exception;
+
+ /**
+ * Shutdown the service. This will be invoked when the service is shutting down.
+ */
+ void shutdown() throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
new file mode 100644
index 0000000..a5b054f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ServicePluginLifecycle;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class TaskScheduler implements ServicePluginLifecycle {
+
+ private final TaskSchedulerContext taskSchedulerContext;
+
+ public TaskScheduler(TaskSchedulerContext taskSchedulerContext) {
+ this.taskSchedulerContext = taskSchedulerContext;
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ }
+
+
+ public abstract Resource getAvailableResources();
+
+ public abstract int getClusterNodeCount();
+
+ public abstract void dagComplete();
+
+ public abstract Resource getTotalResources();
+
+ public abstract void blacklistNode(NodeId nodeId);
+
+ public abstract void unblacklistNode(NodeId nodeId);
+
+ public abstract void allocateTask(Object task, Resource capability,
+ String[] hosts, String[] racks, Priority priority,
+ Object containerSignature, Object clientCookie);
+
+ /**
+ * Allocate affinitized to a specific container
+ */
+ public abstract void allocateTask(Object task, Resource capability,
+ ContainerId containerId, Priority priority, Object containerSignature,
+ Object clientCookie);
+
+ /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */
+ public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason);
+
+ public abstract Object deallocateContainer(ContainerId containerId);
+
+ public abstract void setShouldUnregister();
+
+ public abstract boolean hasUnregistered();
+
+
+ public final TaskSchedulerContext getContext() {
+ return taskSchedulerContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
new file mode 100644
index 0000000..b2c8799
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ContainerSignatureMatcher;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+
+public interface TaskSchedulerContext {
+
+ public class AppFinalStatus {
+ public final FinalApplicationStatus exitStatus;
+ public final String exitMessage;
+ public final String postCompletionTrackingUrl;
+ public AppFinalStatus(FinalApplicationStatus exitStatus,
+ String exitMessage,
+ String posCompletionTrackingUrl) {
+ this.exitStatus = exitStatus;
+ this.exitMessage = exitMessage;
+ this.postCompletionTrackingUrl = posCompletionTrackingUrl;
+ }
+ }
+
+ enum AMState {
+ IDLE, RUNNING_APP, COMPLETED
+ }
+
+ // TODO Post TEZ-2003. Remove references to YARN constructs like Container, ContainerStatus, NodeReport
+ // upcall to app must be outside locks
+ public void taskAllocated(Object task,
+ Object appCookie,
+ Container container);
+ // this may end up being called for a task+container pair that the app
+ // has not heard about. this can happen because of a race between
+ // taskAllocated() upcall and deallocateTask() downcall
+ public void containerCompleted(Object taskLastAllocated,
+ ContainerStatus containerStatus);
+ public void containerBeingReleased(ContainerId containerId);
+ public void nodesUpdated(List<NodeReport> updatedNodes);
+ public void appShutdownRequested();
+
+ // TODO Post TEZ-2003, this method specifically needs some cleaning up.
+ // ClientAMSecretKey is only relevant when running under YARN. As are ApplicationACLs.
+ public void setApplicationRegistrationData(
+ Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer clientAMSecretKey
+ );
+ public void onError(Throwable t);
+ public float getProgress();
+ public void preemptContainer(ContainerId containerId);
+
+ // TODO Post TEZ-2003. Another method which is primarily relevant to YARN clusters for unregistration.
+ public AppFinalStatus getFinalAppStatus();
+
+
+ // Getters
+
+ // TODO TEZ-2003. To be replaced by getInitialPayload
+ public Configuration getInitialConfiguration();
+
+ public String getAppTrackingUrl();
+
+ /**
+ * A custom cluster identifier allocated to schedulers to generate an AppId, if not making
+ * use of YARN
+ * @return
+ */
+ public long getCustomClusterIdentifier();
+
+ public ContainerSignatureMatcher getContainerSignatureMatcher();
+
+ /**
+ * Get the application attempt id for the running application. Relevant when running under YARN
+ * @return
+ */
+ public ApplicationAttemptId getApplicationAttemptId();
+
+ public String getAppHostName();
+
+ public int getAppClientPort();
+
+ public boolean isSession();
+
+ public AMState getAMState();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 4c8c227..532e83c 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -45,6 +45,7 @@ import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
deleted file mode 100644
index 022cd7b..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-public interface TaskCommunicatorInterface {
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index d56fb95..ef27ddf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -98,7 +98,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.AsyncDispatcher;
import org.apache.tez.common.AsyncDispatcherConcurrent;
import org.apache.tez.common.GcTimeUpdater;
-import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.common.TezUtilsInternal;
@@ -149,7 +148,7 @@ import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
import org.apache.tez.dag.app.rm.node.AMNodeTracker;
import org.apache.tez.dag.app.web.WebUIService;
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java
new file mode 100644
index 0000000..dac1b82
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.ServicePluginLifecycle;
+
+/**
+ * Provides service lifecycle management over ServicePlugins using {@link AbstractService}
+ * @param <T>
+ */
+public class ServicePluginLifecycleAbstractService<T extends ServicePluginLifecycle> extends AbstractService {
+
+ private final T service;
+
+ public ServicePluginLifecycleAbstractService(T service) {
+ super(service.getClass().getName());
+ this.service = service;
+ }
+
+ @Override
+ public void serviceInit(Configuration unused) throws Exception {
+ service.initialize();
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ service.start();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ service.shutdown();
+ }
+
+ public T getService() {
+ return service;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index ef789c5..476d00c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -20,16 +20,15 @@ package org.apache.tez.dag.app.rm;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import com.google.common.primitives.Ints;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,56 +42,30 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class LocalTaskSchedulerService extends TaskSchedulerService {
+public class LocalTaskSchedulerService extends TaskScheduler {
private static final Logger LOG = LoggerFactory.getLogger(LocalTaskSchedulerService.class);
- final TaskSchedulerAppCallback realAppClient;
- final TaskSchedulerAppCallback appClientDelegate;
final ContainerSignatureMatcher containerSignatureMatcher;
final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
+ final Configuration conf;
AsyncDelegateRequestHandler taskRequestHandler;
Thread asyncDelegateRequestThread;
- final ExecutorService appCallbackExecutor;
final HashMap<Object, Container> taskAllocations;
- final String appHostName;
- final int appHostPort;
final String appTrackingUrl;
- final AppContext appContext;
final long customContainerAppId;
- public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
- int appHostPort, String appTrackingUrl, long customContainerAppId, AppContext appContext) {
- super(LocalTaskSchedulerService.class.getName());
- this.realAppClient = appClient;
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.containerSignatureMatcher = containerSignatureMatcher;
- this.appClientDelegate = createAppCallbackDelegate(appClient);
- this.appHostName = appHostName;
- this.appHostPort = appHostPort;
- this.appTrackingUrl = appTrackingUrl;
- this.appContext = appContext;
+ public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
+ super(taskSchedulerContext);
taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
taskAllocations = new LinkedHashMap<Object, Container>();
- this.customContainerAppId = customContainerAppId;
- }
-
- private ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
- }
-
- private TaskSchedulerAppCallback createAppCallbackDelegate(
- TaskSchedulerAppCallback realAppClient) {
- return new TaskSchedulerAppCallbackWrapper(realAppClient,
- appCallbackExecutor);
+ this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
+ this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
+ this.customContainerAppId = taskSchedulerContext.getCustomClusterIdentifier();
+ this.conf = taskSchedulerContext.getInitialConfiguration();
}
@Override
@@ -160,7 +133,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public void serviceInit(Configuration conf) {
+ public void initialize() {
taskRequestHandler = createRequestHandler(conf);
asyncDelegateRequestThread = new Thread(taskRequestHandler);
asyncDelegateRequestThread.setDaemon(true);
@@ -168,24 +141,22 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
return new AsyncDelegateRequestHandler(taskRequestQueue,
- new LocalContainerFactory(appContext, customContainerAppId),
+ new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId),
taskAllocations,
- appClientDelegate,
+ getContext(),
conf);
}
@Override
- public void serviceStart() {
+ public void start() {
asyncDelegateRequestThread.start();
}
@Override
- public void serviceStop() throws InterruptedException {
+ public void shutdown() throws InterruptedException {
if (asyncDelegateRequestThread != null) {
asyncDelegateRequestThread.interrupt();
}
- appCallbackExecutor.shutdownNow();
- appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
}
@Override
@@ -202,12 +173,12 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
AtomicInteger nextId;
final ApplicationAttemptId customAppAttemptId;
- public LocalContainerFactory(AppContext appContext, long appIdLong) {
+ public LocalContainerFactory(ApplicationAttemptId appAttemptId, long customAppId) {
this.nextId = new AtomicInteger(1);
ApplicationId appId = ApplicationId
- .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+ .newInstance(customAppId, appAttemptId.getApplicationId().getId());
this.customAppAttemptId = ApplicationAttemptId
- .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
+ .newInstance(appId, appAttemptId.getAttemptId());
}
public Container createContainer(Resource capability, Priority priority) {
@@ -330,18 +301,18 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
final BlockingQueue<TaskRequest> taskRequestQueue;
final LocalContainerFactory localContainerFactory;
final HashMap<Object, Container> taskAllocations;
- final TaskSchedulerAppCallback appClientDelegate;
+ final TaskSchedulerContext taskSchedulerContext;
final int MAX_TASKS;
AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
LocalContainerFactory localContainerFactory,
HashMap<Object, Container> taskAllocations,
- TaskSchedulerAppCallback appClientDelegate,
+ TaskSchedulerContext taskSchedulerContext,
Configuration conf) {
this.taskRequestQueue = taskRequestQueue;
this.localContainerFactory = localContainerFactory;
this.taskAllocations = taskAllocations;
- this.appClientDelegate = appClientDelegate;
+ this.taskSchedulerContext = taskSchedulerContext;
this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
}
@@ -407,13 +378,13 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
Container container = localContainerFactory.createContainer(request.capability,
request.priority);
taskAllocations.put(request.task, container);
- appClientDelegate.taskAllocated(request.task, request.clientCookie, container);
+ taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container);
}
void deallocateTask(DeallocateTaskRequest request) {
Container container = taskAllocations.remove(request.task);
if (container != null) {
- appClientDelegate.containerBeingReleased(container.getId());
+ taskSchedulerContext.containerBeingReleased(container.getId());
}
else {
boolean deallocationBeforeAllocation = false;
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
deleted file mode 100644
index ea37e94..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class TaskSchedulerAppCallbackImpl implements TaskSchedulerService.TaskSchedulerAppCallback{
-
- private final TaskSchedulerEventHandler tseh;
- private final int schedulerId;
-
- public TaskSchedulerAppCallbackImpl(TaskSchedulerEventHandler tseh, int schedulerId) {
- this.tseh = tseh;
- this.schedulerId = schedulerId;
- }
-
- @Override
- public void taskAllocated(Object task, Object appCookie, Container container) {
- tseh.taskAllocated(schedulerId, task, appCookie, container);
- }
-
- @Override
- public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
- tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
- }
-
- @Override
- public void containerBeingReleased(ContainerId containerId) {
- tseh.containerBeingReleased(schedulerId, containerId);
- }
-
- @Override
- public void nodesUpdated(List<NodeReport> updatedNodes) {
- tseh.nodesUpdated(schedulerId, updatedNodes);
- }
-
- @Override
- public void appShutdownRequested() {
- tseh.appShutdownRequested(schedulerId);
- }
-
- @Override
- public void setApplicationRegistrationData(Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls,
- ByteBuffer clientAMSecretKey) {
- tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
- }
-
- @Override
- public void onError(Throwable t) {
- tseh.onError(schedulerId, t);
- }
-
- @Override
- public float getProgress() {
- return tseh.getProgress(schedulerId);
- }
-
- @Override
- public void preemptContainer(ContainerId containerId) {
- tseh.preemptContainer(schedulerId, containerId);
- }
-
- @Override
- public AppFinalStatus getFinalAppStatus() {
- return tseh.getFinalAppStatus();
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
deleted file mode 100644
index 5de8032..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-
-/**
- * Makes use of an ExecutionService to invoke application callbacks. Methods
- * which return values wait for execution to complete - effectively waiting for
- * all previous events in the queue to complete.
- */
-class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
-
- private TaskSchedulerAppCallback real;
-
- ExecutorService executorService;
-
- /**
- * @param real the actual TaskSchedulerAppCallback
- * @param executorService the ExecutorService to be used to send these events.
- */
- public TaskSchedulerAppCallbackWrapper(TaskSchedulerAppCallback real,
- ExecutorService executorService) {
- this.real = real;
- this.executorService = executorService;
- }
-
- @Override
- public void taskAllocated(Object task, Object appCookie, Container container) {
- executorService.submit(new TaskAllocatedCallable(real, task, appCookie,
- container));
- }
-
- @Override
- public void containerCompleted(Object taskLastAllocated,
- ContainerStatus containerStatus) {
- executorService.submit(new ContainerCompletedCallable(real,
- taskLastAllocated, containerStatus));
- }
-
- @Override
- public void containerBeingReleased(ContainerId containerId) {
- executorService
- .submit(new ContainerBeingReleasedCallable(real, containerId));
- }
-
- @Override
- public void nodesUpdated(List<NodeReport> updatedNodes) {
- executorService.submit(new NodesUpdatedCallable(real, updatedNodes));
- }
-
- @Override
- public void appShutdownRequested() {
- executorService.submit(new AppShudownRequestedCallable(real));
- }
-
- @Override
- public void setApplicationRegistrationData(Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
- executorService.submit(new SetApplicationRegistrationDataCallable(real,
- maxContainerCapability, appAcls, key));
- }
-
- @Override
- public void onError(Throwable t) {
- executorService.submit(new OnErrorCallable(real, t));
- }
-
- @Override
- public float getProgress() {
- Future<Float> progressFuture = executorService
- .submit(new GetProgressCallable(real));
- try {
- return progressFuture.get();
- } catch (Exception e) {
- throw new TezUncheckedException(e);
- }
- }
-
- @Override
- public void preemptContainer(ContainerId containerId) {
- executorService.submit(new PreemptContainerCallable(real, containerId));
- }
-
- @Override
- public AppFinalStatus getFinalAppStatus() {
- Future<AppFinalStatus> appFinalStatusFuture = executorService
- .submit(new GetFinalAppStatusCallable(real));
- try {
- return appFinalStatusFuture.get();
- } catch (Exception e) {
- throw new TezUncheckedException(e);
- }
- }
-
-
- static abstract class TaskSchedulerAppCallbackBase {
-
- protected TaskSchedulerAppCallback app;
-
- public TaskSchedulerAppCallbackBase(TaskSchedulerAppCallback app) {
- this.app = app;
- }
- }
-
- static class TaskAllocatedCallable extends TaskSchedulerAppCallbackBase
- implements Callable<Void> {
- private final Object task;
- private final Object appCookie;
- private final Container container;
-
- public TaskAllocatedCallable(TaskSchedulerAppCallback app, Object task,
- Object appCookie, Container container) {
- super(app);
- this.task = task;
- this.appCookie = appCookie;
- this.container = container;
- }
-
- @Override
- public Void call() throws Exception {
- app.taskAllocated(task, appCookie, container);
- return null;
- }
- }
-
- static class ContainerCompletedCallable extends TaskSchedulerAppCallbackBase
- implements Callable<Void> {
-
- private final Object taskLastAllocated;
- private final ContainerStatus containerStatus;
-
- public ContainerCompletedCallable(TaskSchedulerAppCallback app,
- Object taskLastAllocated, ContainerStatus containerStatus) {
- super(app);
- this.taskLastAllocated = taskLastAllocated;
- this.containerStatus = containerStatus;
- }
-
- @Override
- public Void call() throws Exception {
- app.containerCompleted(taskLastAllocated, containerStatus);
- return null;
- }
- }
-
- static class ContainerBeingReleasedCallable extends
- TaskSchedulerAppCallbackBase implements Callable<Void> {
- private final ContainerId containerId;
-
- public ContainerBeingReleasedCallable(TaskSchedulerAppCallback app,
- ContainerId containerId) {
- super(app);
- this.containerId = containerId;
- }
-
- @Override
- public Void call() throws Exception {
- app.containerBeingReleased(containerId);
- return null;
- }
- }
-
- static class NodesUpdatedCallable extends TaskSchedulerAppCallbackBase
- implements Callable<Void> {
- private final List<NodeReport> updatedNodes;
-
- public NodesUpdatedCallable(TaskSchedulerAppCallback app,
- List<NodeReport> updatedNodes) {
- super(app);
- this.updatedNodes = updatedNodes;
- }
-
- @Override
- public Void call() throws Exception {
- app.nodesUpdated(updatedNodes);
- return null;
- }
- }
-
- static class AppShudownRequestedCallable extends TaskSchedulerAppCallbackBase
- implements Callable<Void> {
-
- public AppShudownRequestedCallable(TaskSchedulerAppCallback app) {
- super(app);
- }
-
- @Override
- public Void call() throws Exception {
- app.appShutdownRequested();
- return null;
- }
- }
-
- static class SetApplicationRegistrationDataCallable extends
- TaskSchedulerAppCallbackBase implements Callable<Void> {
-
- private final Resource maxContainerCapability;
- private final Map<ApplicationAccessType, String> appAcls;
- private final ByteBuffer key;
-
- public SetApplicationRegistrationDataCallable(TaskSchedulerAppCallback app,
- Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls,
- ByteBuffer key) {
- super(app);
- this.maxContainerCapability = maxContainerCapability;
- this.appAcls = appAcls;
- this.key = key;
- }
-
- @Override
- public Void call() throws Exception {
- app.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
- return null;
- }
- }
-
- static class OnErrorCallable extends TaskSchedulerAppCallbackBase implements
- Callable<Void> {
-
- private final Throwable throwable;
-
- public OnErrorCallable(TaskSchedulerAppCallback app, Throwable throwable) {
- super(app);
- this.throwable = throwable;
- }
-
- @Override
- public Void call() throws Exception {
- app.onError(throwable);
- return null;
- }
- }
-
- static class PreemptContainerCallable extends TaskSchedulerAppCallbackBase
- implements Callable<Void> {
- private final ContainerId containerId;
-
- public PreemptContainerCallable(TaskSchedulerAppCallback app, ContainerId id) {
- super(app);
- this.containerId = id;
- }
-
- @Override
- public Void call() throws Exception {
- app.preemptContainer(containerId);
- return null;
- }
- }
-
- static class GetProgressCallable extends TaskSchedulerAppCallbackBase
- implements Callable<Float> {
-
- public GetProgressCallable(TaskSchedulerAppCallback app) {
- super(app);
- }
-
- @Override
- public Float call() throws Exception {
- return app.getProgress();
- }
- }
-
- static class GetFinalAppStatusCallable extends TaskSchedulerAppCallbackBase
- implements Callable<AppFinalStatus> {
-
- public GetFinalAppStatusCallable(TaskSchedulerAppCallback app) {
- super(app);
- }
-
- @Override
- public AppFinalStatus call() throws Exception {
- return app.getFinalAppStatus();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
new file mode 100644
index 0000000..890870e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+
+public class TaskSchedulerContextImpl implements TaskSchedulerContext {
+
+ private final TaskSchedulerEventHandler tseh;
+ private final AppContext appContext;
+ private final int schedulerId;
+ private final String trackingUrl;
+ private final long customClusterIdentifier;
+ private final String appHostName;
+ private final int clientPort;
+ private final Configuration conf;
+
+ public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext,
+ int schedulerId, String trackingUrl, long customClusterIdentifier,
+ String appHostname, int clientPort,
+ Configuration conf) {
+ this.tseh = tseh;
+ this.appContext = appContext;
+ this.schedulerId = schedulerId;
+ this.trackingUrl = trackingUrl;
+ this.customClusterIdentifier = customClusterIdentifier;
+ this.appHostName = appHostname;
+ this.clientPort = clientPort;
+ this.conf = conf;
+
+ }
+
+ @Override
+ public void taskAllocated(Object task, Object appCookie, Container container) {
+ tseh.taskAllocated(schedulerId, task, appCookie, container);
+ }
+
+ @Override
+ public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
+ tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
+ }
+
+ @Override
+ public void containerBeingReleased(ContainerId containerId) {
+ tseh.containerBeingReleased(schedulerId, containerId);
+ }
+
+ @Override
+ public void nodesUpdated(List<NodeReport> updatedNodes) {
+ tseh.nodesUpdated(schedulerId, updatedNodes);
+ }
+
+ @Override
+ public void appShutdownRequested() {
+ tseh.appShutdownRequested(schedulerId);
+ }
+
+ @Override
+ public void setApplicationRegistrationData(Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer clientAMSecretKey) {
+ tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ tseh.onError(schedulerId, t);
+ }
+
+ @Override
+ public float getProgress() {
+ return tseh.getProgress(schedulerId);
+ }
+
+ @Override
+ public void preemptContainer(ContainerId containerId) {
+ tseh.preemptContainer(schedulerId, containerId);
+ }
+
+ @Override
+ public AppFinalStatus getFinalAppStatus() {
+ return tseh.getFinalAppStatus();
+ }
+
+ @Override
+ public Configuration getInitialConfiguration() {
+ return conf;
+ }
+
+
+ @Override
+ public String getAppTrackingUrl() {
+ return trackingUrl;
+ }
+
+ @Override
+ public long getCustomClusterIdentifier() {
+ return customClusterIdentifier;
+ }
+
+ @Override
+ public ContainerSignatureMatcher getContainerSignatureMatcher() {
+ return tseh.getContainerSignatureMatcher();
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appContext.getApplicationAttemptId();
+ }
+
+ @Override
+ public String getAppHostName() {
+ return appHostName;
+ }
+
+ @Override
+ public int getAppClientPort() {
+ return clientPort;
+ }
+
+ @Override
+ public boolean isSession() {
+ return appContext.isSession();
+ }
+
+ @Override
+ public AMState getAMState() {
+ switch (appContext.getAMState()) {
+
+ case NEW:
+ case INITED:
+ case IDLE:
+ return AMState.IDLE;
+ case RECOVERING:
+ // TODO Is this correct for recovery ?
+ case RUNNING:
+ return AMState.RUNNING_APP;
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED:
+ case ERROR:
+ return AMState.COMPLETED;
+ default:
+ throw new TezUncheckedException("Unexpected state " + appContext.getAMState());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
new file mode 100644
index 0000000..e64ef43
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+
+/**
+ * Makes use of an ExecutionService to invoke application callbacks. Methods
+ * which return values wait for execution to complete - effectively waiting for
+ * all previous events in the queue to complete.
+ */
+class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
+
+ private TaskSchedulerContext real;
+
+ private ExecutorService executorService;
+
+ /**
+ * @param real the actual TaskSchedulerAppCallback
+ * @param executorService the ExecutorService to be used to send these events.
+ */
+ public TaskSchedulerContextImplWrapper(TaskSchedulerContext real,
+ ExecutorService executorService) {
+ this.real = real;
+ this.executorService = executorService;
+ }
+
+ @Override
+ public void taskAllocated(Object task, Object appCookie, Container container) {
+ executorService.submit(new TaskAllocatedCallable(real, task, appCookie,
+ container));
+ }
+
+ @Override
+ public void containerCompleted(Object taskLastAllocated,
+ ContainerStatus containerStatus) {
+ executorService.submit(new ContainerCompletedCallable(real,
+ taskLastAllocated, containerStatus));
+ }
+
+ @Override
+ public void containerBeingReleased(ContainerId containerId) {
+ executorService
+ .submit(new ContainerBeingReleasedCallable(real, containerId));
+ }
+
+ @Override
+ public void nodesUpdated(List<NodeReport> updatedNodes) {
+ executorService.submit(new NodesUpdatedCallable(real, updatedNodes));
+ }
+
+ @Override
+ public void appShutdownRequested() {
+ executorService.submit(new AppShudownRequestedCallable(real));
+ }
+
+ @Override
+ public void setApplicationRegistrationData(Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
+ executorService.submit(new SetApplicationRegistrationDataCallable(real,
+ maxContainerCapability, appAcls, key));
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ executorService.submit(new OnErrorCallable(real, t));
+ }
+
+ @Override
+ public float getProgress() {
+ Future<Float> progressFuture = executorService
+ .submit(new GetProgressCallable(real));
+ try {
+ return progressFuture.get();
+ } catch (Exception e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @Override
+ public void preemptContainer(ContainerId containerId) {
+ executorService.submit(new PreemptContainerCallable(real, containerId));
+ }
+
+ @Override
+ public AppFinalStatus getFinalAppStatus() {
+ Future<AppFinalStatus> appFinalStatusFuture = executorService
+ .submit(new GetFinalAppStatusCallable(real));
+ try {
+ return appFinalStatusFuture.get();
+ } catch (Exception e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ // Getters which do not need to go through a thread. Underlying implementation
+ // does not use locks.
+
+ @Override
+ public Configuration getInitialConfiguration() {
+ return real.getInitialConfiguration();
+ }
+
+ @Override
+ public String getAppTrackingUrl() {
+ return real.getAppTrackingUrl();
+ }
+
+ @Override
+ public long getCustomClusterIdentifier() {
+ return real.getCustomClusterIdentifier();
+ }
+
+ @Override
+ public ContainerSignatureMatcher getContainerSignatureMatcher() {
+ return real.getContainerSignatureMatcher();
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return real.getApplicationAttemptId();
+ }
+
+ @Override
+ public String getAppHostName() {
+ return real.getAppHostName();
+ }
+
+ @Override
+ public int getAppClientPort() {
+ return real.getAppClientPort();
+ }
+
+ @Override
+ public boolean isSession() {
+ return real.isSession();
+ }
+
+ @Override
+ public AMState getAMState() {
+ return real.getAMState();
+ }
+ // End of getters which do not need to go through a thread. Underlying implementation
+ // does not use locks.
+
+
+ static abstract class TaskSchedulerContextCallbackBase {
+
+ protected TaskSchedulerContext app;
+
+ public TaskSchedulerContextCallbackBase(TaskSchedulerContext app) {
+ this.app = app;
+ }
+ }
+
+ static class TaskAllocatedCallable extends TaskSchedulerContextCallbackBase
+ implements Callable<Void> {
+ private final Object task;
+ private final Object appCookie;
+ private final Container container;
+
+ public TaskAllocatedCallable(TaskSchedulerContext app, Object task,
+ Object appCookie, Container container) {
+ super(app);
+ this.task = task;
+ this.appCookie = appCookie;
+ this.container = container;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.taskAllocated(task, appCookie, container);
+ return null;
+ }
+ }
+
+ static class ContainerCompletedCallable extends TaskSchedulerContextCallbackBase
+ implements Callable<Void> {
+
+ private final Object taskLastAllocated;
+ private final ContainerStatus containerStatus;
+
+ public ContainerCompletedCallable(TaskSchedulerContext app,
+ Object taskLastAllocated, ContainerStatus containerStatus) {
+ super(app);
+ this.taskLastAllocated = taskLastAllocated;
+ this.containerStatus = containerStatus;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.containerCompleted(taskLastAllocated, containerStatus);
+ return null;
+ }
+ }
+
+ static class ContainerBeingReleasedCallable extends
+ TaskSchedulerContextCallbackBase implements Callable<Void> {
+ private final ContainerId containerId;
+
+ public ContainerBeingReleasedCallable(TaskSchedulerContext app,
+ ContainerId containerId) {
+ super(app);
+ this.containerId = containerId;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.containerBeingReleased(containerId);
+ return null;
+ }
+ }
+
+ static class NodesUpdatedCallable extends TaskSchedulerContextCallbackBase
+ implements Callable<Void> {
+ private final List<NodeReport> updatedNodes;
+
+ public NodesUpdatedCallable(TaskSchedulerContext app,
+ List<NodeReport> updatedNodes) {
+ super(app);
+ this.updatedNodes = updatedNodes;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.nodesUpdated(updatedNodes);
+ return null;
+ }
+ }
+
+ static class AppShudownRequestedCallable extends TaskSchedulerContextCallbackBase
+ implements Callable<Void> {
+
+ public AppShudownRequestedCallable(TaskSchedulerContext app) {
+ super(app);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.appShutdownRequested();
+ return null;
+ }
+ }
+
+ static class SetApplicationRegistrationDataCallable extends
+ TaskSchedulerContextCallbackBase implements Callable<Void> {
+
+ private final Resource maxContainerCapability;
+ private final Map<ApplicationAccessType, String> appAcls;
+ private final ByteBuffer key;
+
+ public SetApplicationRegistrationDataCallable(TaskSchedulerContext app,
+ Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer key) {
+ super(app);
+ this.maxContainerCapability = maxContainerCapability;
+ this.appAcls = appAcls;
+ this.key = key;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
+ return null;
+ }
+ }
+
+ static class OnErrorCallable extends TaskSchedulerContextCallbackBase implements
+ Callable<Void> {
+
+ private final Throwable throwable;
+
+ public OnErrorCallable(TaskSchedulerContext app, Throwable throwable) {
+ super(app);
+ this.throwable = throwable;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.onError(throwable);
+ return null;
+ }
+ }
+
+ static class PreemptContainerCallable extends TaskSchedulerContextCallbackBase
+ implements Callable<Void> {
+ private final ContainerId containerId;
+
+ public PreemptContainerCallable(TaskSchedulerContext app, ContainerId id) {
+ super(app);
+ this.containerId = id;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.preemptContainer(containerId);
+ return null;
+ }
+ }
+
+ static class GetProgressCallable extends TaskSchedulerContextCallbackBase
+ implements Callable<Float> {
+
+ public GetProgressCallable(TaskSchedulerContext app) {
+ super(app);
+ }
+
+ @Override
+ public Float call() throws Exception {
+ return app.getProgress();
+ }
+ }
+
+ static class GetFinalAppStatusCallable extends TaskSchedulerContextCallbackBase
+ implements Callable<AppFinalStatus> {
+
+ public GetFinalAppStatusCallable(TaskSchedulerContext app) {
+ super(app);
+ }
+
+ @Override
+ public AppFinalStatus call() throws Exception {
+ return app.getFinalAppStatus();
+ }
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ ExecutorService getExecutorService() {
+ return executorService;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 1ad0059..d8cf080 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -25,11 +25,19 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -62,7 +70,6 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -70,7 +77,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
@@ -106,7 +113,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements
new AtomicBoolean(false);
private final WebUIService webUI;
private final String[] taskSchedulerClasses;
- protected final TaskSchedulerService []taskSchedulers;
+ protected final TaskScheduler[]taskSchedulers;
+ protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
+
+ // Single executor service shared by all Schedulers for context callbacks
+ @VisibleForTesting
+ final ExecutorService appCallbackExecutor;
private final boolean isPureLocalMode;
// If running in non local-only mode, the YARN task scheduler will always run to take care of
@@ -147,6 +159,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
this.webUI = webUI;
this.historyUrl = getHistoryUrl();
this.isPureLocalMode = isPureLocalMode;
+ this.appCallbackExecutor = createAppCallbackExecutorService();
if (this.webUI != null) {
this.webUI.setHistoryUrl(this.historyUrl);
}
@@ -181,7 +194,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
}
}
- taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length];
+ taskSchedulers = new TaskScheduler[this.taskSchedulerClasses.length];
+ taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerClasses.length];
}
public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -205,6 +219,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements
return taskSchedulers[schedulerId].getTotalResources();
}
+ private ExecutorService createAppCallbackExecutorService() {
+ return Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d").setDaemon(true)
+ .build());
+ }
+
public synchronized void handleEvent(AMSchedulerEvent sEvent) {
LOG.info("Processing the event " + sEvent.toString());
switch (sEvent.getType()) {
@@ -315,7 +335,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
// stopped.
// AMNodeImpl blacklisting logic does not account for KILLED attempts.
sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
- get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), attemptContainerId,
+ get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
+ attemptContainerId,
attempt.getID(), event.getState() == TaskAttemptState.FAILED));
}
}
@@ -389,32 +410,30 @@ public class TaskSchedulerEventHandler extends AbstractService implements
event);
}
- private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
+ private TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
AppContext appContext,
String schedulerClassName,
long customAppIdIdentifier,
int schedulerId) {
- TaskSchedulerAppCallback appCallback = new TaskSchedulerAppCallbackImpl(this, schedulerId);
+ TaskSchedulerContext rawContext =
+ new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
+ customAppIdIdentifier, host, port, getConfig());
+ TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
- return new YarnTaskSchedulerService(appCallback, this.containerSignatureMatcher,
- host, port, trackingUrl, appContext);
+ return new YarnTaskSchedulerService(wrappedContext);
} else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Creating TaskScheduler: Local TaskScheduler");
- return new LocalTaskSchedulerService(appCallback, this.containerSignatureMatcher,
- host, port, trackingUrl, customAppIdIdentifier, appContext);
+ return new LocalTaskSchedulerService(wrappedContext);
} else {
LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
- // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface.
- Class<? extends TaskSchedulerService> taskSchedulerClazz =
- (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+ Class<? extends TaskScheduler> taskSchedulerClazz =
+ (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(schedulerClassName);
try {
- Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
- .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
- int.class, String.class, long.class, Configuration.class);
+ Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
+ .getConstructor(TaskSchedulerContext.class);
ctor.setAccessible(true);
- return ctor.newInstance(appCallback, appContext, host, port, trackingUrl, customAppIdIdentifier,
- getConfig());
+ return ctor.newInstance(wrappedContext);
} catch (NoSuchMethodException e) {
throw new TezUncheckedException(e);
} catch (InvocationTargetException e) {
@@ -444,6 +463,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
customAppIdIdentifier);
taskSchedulers[i] = createTaskScheduler(host, port,
trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i);
+ taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]);
}
}
@@ -460,8 +480,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
for (int i = 0 ; i < taskSchedulers.length ; i++) {
- taskSchedulers[i].init(getConfig());
- taskSchedulers[i].start();
+ taskSchedulerServiceWrappers[i].init(getConfig());
+ taskSchedulerServiceWrappers[i].start();
if (shouldUnregisterFlag.get()) {
// Flag may have been set earlier when task scheduler was not initialized
// TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ?
@@ -510,7 +530,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
}
@Override
- public void serviceStop() {
+ public void serviceStop() throws InterruptedException {
synchronized(this) {
this.stopEventHandling = true;
if (eventHandlingThread != null)
@@ -518,9 +538,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements
}
for (int i = 0 ; i < taskSchedulers.length ; i++) {
if (taskSchedulers[i] != null) {
- taskSchedulers[i].stop();
+ taskSchedulerServiceWrappers[i].stop();
}
}
+ LOG.info("Shutting down AppCallbackExecutor");
+ appCallbackExecutor.shutdownNow();
+ appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
}
// TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context
@@ -716,6 +739,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements
}
}
+ public ContainerSignatureMatcher getContainerSignatureMatcher() {
+ return containerSignatureMatcher;
+ }
+
public boolean hasUnregistered() {
boolean result = true;
for (int i = 0 ; i < taskSchedulers.length ; i++) {
@@ -757,4 +784,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements
return historyUrl;
}
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ ExecutorService getContextExecutorService() {
+ return appCallbackExecutor;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
deleted file mode 100644
index 25fd13e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-
-public abstract class TaskSchedulerService extends AbstractService{
-
- public TaskSchedulerService(String name) {
- super(name);
- }
-
- public abstract Resource getAvailableResources();
-
- public abstract int getClusterNodeCount();
-
- public abstract void dagComplete();
-
- public abstract Resource getTotalResources();
-
- public abstract void blacklistNode(NodeId nodeId);
-
- public abstract void unblacklistNode(NodeId nodeId);
-
- public abstract void allocateTask(Object task, Resource capability,
- String[] hosts, String[] racks, Priority priority,
- Object containerSignature, Object clientCookie);
-
- /**
- * Allocate affinitized to a specific container
- */
- public abstract void allocateTask(Object task, Resource capability,
- ContainerId containerId, Priority priority, Object containerSignature,
- Object clientCookie);
-
- /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */
- public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason);
-
- public abstract Object deallocateContainer(ContainerId containerId);
-
- public abstract void setShouldUnregister();
-
- public abstract boolean hasUnregistered();
-
- public interface TaskSchedulerAppCallback {
- public class AppFinalStatus {
- public final FinalApplicationStatus exitStatus;
- public final String exitMessage;
- public final String postCompletionTrackingUrl;
- public AppFinalStatus(FinalApplicationStatus exitStatus,
- String exitMessage,
- String posCompletionTrackingUrl) {
- this.exitStatus = exitStatus;
- this.exitMessage = exitMessage;
- this.postCompletionTrackingUrl = posCompletionTrackingUrl;
- }
- }
- // upcall to app must be outside locks
- public void taskAllocated(Object task,
- Object appCookie,
- Container container);
- // this may end up being called for a task+container pair that the app
- // has not heard about. this can happen because of a race between
- // taskAllocated() upcall and deallocateTask() downcall
- public void containerCompleted(Object taskLastAllocated,
- ContainerStatus containerStatus);
- public void containerBeingReleased(ContainerId containerId);
- public void nodesUpdated(List<NodeReport> updatedNodes);
- public void appShutdownRequested();
- public void setApplicationRegistrationData(
- Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls,
- ByteBuffer clientAMSecretKey
- );
- public void onError(Throwable t);
- public float getProgress();
- public void preemptContainer(ContainerId containerId);
- public AppFinalStatus getFinalAppStatus();
-
- }
-}