You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:48 UTC
[30/50] [abbrv] tez git commit: TEZ-2005. Define basic interface for
pluggable TaskScheduler. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 506e991..7d209bc 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -20,14 +20,10 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,30 +33,24 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.app.AppContext;
import org.apache.tez.service.TezTestServiceConfConstants;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
+public class TezTestServiceTaskSchedulerService extends TaskScheduler {
private static final Logger
LOG = LoggerFactory.getLogger(TezTestServiceTaskSchedulerService.class);
- private final ExecutorService appCallbackExecutor;
- private final TaskSchedulerAppCallback appClientDelegate;
- private final AppContext appContext;
private final List<String> serviceHosts;
private final ContainerFactory containerFactory;
private final Random random = new Random();
// Currently all services must be running on the same port.
private final int containerPort;
- private final String clientHostname;
- private final int clientPort;
- private final String trackingUrl;
- private final AtomicBoolean isStopped = new AtomicBoolean(false);
private final ConcurrentMap<Object, ContainerId> runningTasks =
new ConcurrentHashMap<Object, ContainerId>();
@@ -77,20 +67,14 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
// Not registering with the RM. Assuming the main TezScheduler will always run (except local mode),
// and take care of YARN registration.
- public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
- AppContext appContext,
- String clientHostname, int clientPort,
- String trackingUrl,
- long customAppIdIdentifier,
- Configuration conf) {
+ public TezTestServiceTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
// Accepting configuration here to allow setting up fields as final
- super(TezTestServiceTaskSchedulerService.class.getName());
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.appClientDelegate = createAppCallbackDelegate(appClient);
- this.appContext = appContext;
+ super(taskSchedulerContext);
this.serviceHosts = new LinkedList<String>();
- this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
+ this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
+ taskSchedulerContext.getCustomClusterIdentifier());
+ Configuration conf = taskSchedulerContext.getInitialConfiguration();
this.memoryPerInstance = conf
.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
Preconditions.checkArgument(memoryPerInstance > 0,
@@ -112,10 +96,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
Preconditions.checkArgument(executorsPerInstance > 0,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured");
- this.clientHostname = clientHostname;
- this.clientPort = clientPort;
- this.trackingUrl = trackingUrl;
-
int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
@@ -138,13 +118,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public void serviceStop() {
- if (!this.isStopped.getAndSet(true)) {
- appCallbackExecutor.shutdownNow();
- }
- }
-
- @Override
public Resource getAvailableResources() {
// TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
return Resource
@@ -185,7 +158,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
Container container =
containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
runningTasks.put(task, container.getId());
- appClientDelegate.taskAllocated(task, clientCookie, container);
+ getContext().taskAllocated(task, clientCookie, container);
}
@@ -196,7 +169,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
Container container =
containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
runningTasks.put(task, container.getId());
- appClientDelegate.taskAllocated(task, clientCookie, container);
+ getContext().taskAllocated(task, clientCookie, container);
}
@Override
@@ -208,7 +181,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
" The query may hang since this \"unknown\" container is now taking up a slot permanently");
return false;
}
- appClientDelegate.containerBeingReleased(containerId);
+ getContext().containerBeingReleased(containerId);
return true;
}
@@ -229,17 +202,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
return true;
}
- private ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TezTestTaskSchedulerAppCaller").setDaemon(true).build());
- }
-
- private TaskSchedulerAppCallback createAppCallbackDelegate(
- TaskSchedulerAppCallback realAppClient) {
- return new TaskSchedulerAppCallbackWrapper(realAppClient,
- appCallbackExecutor);
- }
-
private String selectHost(String[] requestedHosts) {
String host;
if (requestedHosts != null && requestedHosts.length > 0) {
@@ -257,12 +219,12 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
AtomicInteger nextId;
final ApplicationAttemptId customAppAttemptId;
- public ContainerFactory(AppContext appContext, long appIdLong) {
+ public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) {
this.nextId = new AtomicInteger(1);
ApplicationId appId = ApplicationId
- .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+ .newInstance(appIdLong, appAttemptId.getApplicationId().getId());
this.customAppAttemptId = ApplicationAttemptId
- .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
+ .newInstance(appId, appAttemptId.getAttemptId());
}
public Container createContainer(Resource capability, Priority priority, String hostname, int port) {