You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/21 03:36:24 UTC

[16/50] [abbrv] tez git commit: TEZ-2005. Define basic interface for pluggable TaskScheduler. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/b6582f06/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 506e991..7d209bc 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -20,14 +20,10 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,30 +33,24 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.service.TezTestServiceConfConstants;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
+public class TezTestServiceTaskSchedulerService extends TaskScheduler {
 
   private static final Logger
       LOG = LoggerFactory.getLogger(TezTestServiceTaskSchedulerService.class);
 
-  private final ExecutorService appCallbackExecutor;
-  private final TaskSchedulerAppCallback appClientDelegate;
-  private final AppContext appContext;
   private final List<String> serviceHosts;
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
   // Currently all services must be running on the same port.
   private final int containerPort;
 
-  private final String clientHostname;
-  private final int clientPort;
-  private final String trackingUrl;
-  private final AtomicBoolean isStopped = new AtomicBoolean(false);
   private final ConcurrentMap<Object, ContainerId> runningTasks =
       new ConcurrentHashMap<Object, ContainerId>();
 
@@ -77,20 +67,14 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
 
   // Not registering with the RM. Assuming the main TezScheduler will always run (except local mode),
   // and take care of YARN registration.
-  public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
-                                            AppContext appContext,
-                                            String clientHostname, int clientPort,
-                                            String trackingUrl,
-                                            long customAppIdIdentifier,
-                                            Configuration conf) {
+  public TezTestServiceTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
     // Accepting configuration here to allow setting up fields as final
-    super(TezTestServiceTaskSchedulerService.class.getName());
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
-    this.appContext = appContext;
+    super(taskSchedulerContext);
     this.serviceHosts = new LinkedList<String>();
-    this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
+    this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
+        taskSchedulerContext.getCustomClusterIdentifier());
 
+    Configuration conf = taskSchedulerContext.getInitialConfiguration();
     this.memoryPerInstance = conf
         .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
     Preconditions.checkArgument(memoryPerInstance > 0,
@@ -112,10 +96,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
     Preconditions.checkArgument(executorsPerInstance > 0,
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be configured");
 
-    this.clientHostname = clientHostname;
-    this.clientPort = clientPort;
-    this.trackingUrl = trackingUrl;
-
     int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
     int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
     this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
@@ -138,13 +118,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public void serviceStop() {
-    if (!this.isStopped.getAndSet(true)) {
-      appCallbackExecutor.shutdownNow();
-    }
-  }
-
-  @Override
   public Resource getAvailableResources() {
     // TODO This needs information about all running executors, and the amount of memory etc available across the cluster.
     return Resource
@@ -185,7 +158,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
     Container container =
         containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
     runningTasks.put(task, container.getId());
-    appClientDelegate.taskAllocated(task, clientCookie, container);
+    getContext().taskAllocated(task, clientCookie, container);
   }
 
 
@@ -196,7 +169,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
     Container container =
         containerFactory.createContainer(resourcePerContainer, priority, host, containerPort);
     runningTasks.put(task, container.getId());
-    appClientDelegate.taskAllocated(task, clientCookie, container);
+    getContext().taskAllocated(task, clientCookie, container);
   }
 
   @Override
@@ -208,7 +181,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
           " The query may hang since this \"unknown\" container is now taking up a slot permanently");
       return false;
     }
-    appClientDelegate.containerBeingReleased(containerId);
+    getContext().containerBeingReleased(containerId);
     return true;
   }
 
@@ -229,17 +202,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
     return true;
   }
 
-  private ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TezTestTaskSchedulerAppCaller").setDaemon(true).build());
-  }
-
-  private TaskSchedulerAppCallback createAppCallbackDelegate(
-      TaskSchedulerAppCallback realAppClient) {
-    return new TaskSchedulerAppCallbackWrapper(realAppClient,
-        appCallbackExecutor);
-  }
-
   private String selectHost(String[] requestedHosts) {
     String host;
     if (requestedHosts != null && requestedHosts.length > 0) {
@@ -257,12 +219,12 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
     AtomicInteger nextId;
     final ApplicationAttemptId customAppAttemptId;
 
-    public ContainerFactory(AppContext appContext, long appIdLong) {
+    public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) {
       this.nextId = new AtomicInteger(1);
       ApplicationId appId = ApplicationId
-          .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+          .newInstance(appIdLong, appAttemptId.getApplicationId().getId());
       this.customAppAttemptId = ApplicationAttemptId
-          .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
+          .newInstance(appId, appAttemptId.getAttemptId());
     }
 
     public Container createContainer(Resource capability, Priority priority, String hostname, int port) {