You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/12/12 22:59:48 UTC

[06/28] Making maven site works.

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
deleted file mode 100644
index b51bb63..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
-import org.apache.twill.internal.yarn.YarnLaunchContext;
-import org.apache.twill.internal.yarn.YarnUtils;
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-
-import java.util.Map;
-
-/**
- * A {@link org.apache.twill.internal.ProcessLauncher} for launching Application Master from the client.
- */
-public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationId> {
-
-  private final ApplicationSubmitter submitter;
-
-  public ApplicationMasterProcessLauncher(ApplicationId appId, ApplicationSubmitter submitter) {
-    super(appId);
-    this.submitter = submitter;
-  }
-
-  @Override
-  protected boolean useArchiveSuffix() {
-    return false;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
-    final ApplicationId appId = getContainerInfo();
-
-    // Set the resource requirement for AM
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(Constants.APP_MASTER_MEMORY_MB);
-    YarnUtils.setVirtualCores(capability, 1);
-
-    // Put in extra environments
-    Map<String, String> env = ImmutableMap.<String, String>builder()
-      .putAll(launchContext.getEnvironment())
-      .put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId()))
-      .put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, Long.toString(appId.getClusterTimestamp()))
-      .put(EnvKeys.YARN_APP_ID_STR, appId.toString())
-      .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(Constants.APP_MASTER_MEMORY_MB))
-      .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(YarnUtils.getVirtualCores(capability)))
-      .build();
-
-    launchContext.setEnvironment(env);
-    return (ProcessController<R>) submitter.submit(launchContext, capability);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
deleted file mode 100644
index 51c8503..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ /dev/null
@@ -1,799 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.Command;
-import org.apache.twill.api.EventHandler;
-import org.apache.twill.api.EventHandlerSpecification;
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.common.Threads;
-import org.apache.twill.filesystem.Location;
-import org.apache.twill.internal.AbstractTwillService;
-import org.apache.twill.internal.Configs;
-import org.apache.twill.internal.Constants;
-import org.apache.twill.internal.DefaultTwillRunResources;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.ProcessLauncher;
-import org.apache.twill.internal.TwillContainerLauncher;
-import org.apache.twill.internal.ZKServiceDecorator;
-import org.apache.twill.internal.json.LocalFileCodec;
-import org.apache.twill.internal.json.TwillSpecificationAdapter;
-import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.logging.Loggings;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.state.MessageCallback;
-import org.apache.twill.internal.utils.Instances;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.internal.yarn.YarnAMClient;
-import org.apache.twill.internal.yarn.YarnAMClientFactory;
-import org.apache.twill.internal.yarn.YarnContainerInfo;
-import org.apache.twill.internal.yarn.YarnContainerStatus;
-import org.apache.twill.internal.yarn.YarnUtils;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClients;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multiset;
-import com.google.common.collect.Sets;
-import com.google.common.io.CharStreams;
-import com.google.common.io.Files;
-import com.google.common.io.InputSupplier;
-import com.google.common.reflect.TypeToken;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.zookeeper.CreateMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.Reader;
-import java.net.URI;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public final class ApplicationMasterService extends AbstractTwillService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
-
-  // Copied from org.apache.hadoop.yarn.security.AMRMTokenIdentifier.KIND_NAME since it's missing in Hadoop-2.0
-  private static final Text AMRM_TOKEN_KIND_NAME = new Text("YARN_AM_RM_TOKEN");
-
-  private final RunId runId;
-  private final ZKClient zkClient;
-  private final TwillSpecification twillSpec;
-  private final ApplicationMasterLiveNodeData amLiveNode;
-  private final ZKServiceDecorator serviceDelegate;
-  private final RunningContainers runningContainers;
-  private final ExpectedContainers expectedContainers;
-  private final TrackerService trackerService;
-  private final YarnAMClient amClient;
-  private final String jvmOpts;
-  private final int reservedMemory;
-  private final EventHandler eventHandler;
-  private final Location applicationLocation;
-
-  private EmbeddedKafkaServer kafkaServer;
-  private Queue<RunnableContainerRequest> runnableContainerRequests;
-  private ExecutorService instanceChangeExecutor;
-
-  public ApplicationMasterService(RunId runId, ZKClient zkClient, File twillSpecFile,
-                                  YarnAMClientFactory amClientFactory, Location applicationLocation) throws Exception {
-    super(applicationLocation);
-
-    this.runId = runId;
-    this.twillSpec = TwillSpecificationAdapter.create().fromJson(twillSpecFile);
-    this.zkClient = zkClient;
-    this.applicationLocation = applicationLocation;
-    this.amClient = amClientFactory.create();
-    this.credentials = createCredentials();
-    this.jvmOpts = loadJvmOptions();
-    this.reservedMemory = getReservedMemory();
-
-    amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
-                                                   Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
-                                                   amClient.getContainerId().toString());
-
-    serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeDataSupplier(),
-                                             new ServiceDelegate(), new Runnable() {
-      @Override
-      public void run() {
-        amClient.stopAndWait();
-      }
-    });
-    expectedContainers = initExpectedContainers(twillSpec);
-    runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
-    trackerService = new TrackerService(runningContainers.getResourceReport(), amClient.getHost());
-    eventHandler = createEventHandler(twillSpec);
-  }
-
-  private String loadJvmOptions() throws IOException {
-    final File jvmOptsFile = new File(Constants.Files.JVM_OPTIONS);
-    if (!jvmOptsFile.exists()) {
-      return "";
-    }
-
-    return CharStreams.toString(new InputSupplier<Reader>() {
-      @Override
-      public Reader getInput() throws IOException {
-        return new FileReader(jvmOptsFile);
-      }
-    });
-  }
-
-  private int getReservedMemory() {
-    String value = System.getenv(EnvKeys.TWILL_RESERVED_MEMORY_MB);
-    if (value == null) {
-      return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
-    }
-    try {
-      return Integer.parseInt(value);
-    } catch (Exception e) {
-      return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
-    }
-  }
-
-  private EventHandler createEventHandler(TwillSpecification twillSpec) {
-    try {
-      // Should be able to load by this class ClassLoader, as they packaged in the same jar.
-      EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
-
-      Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName());
-      Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
-                                  "Class {} does not implements {}",
-                                  handlerClass, EventHandler.class.getName());
-      return Instances.newInstance((Class<? extends EventHandler>) handlerClass);
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private Supplier<? extends JsonElement> createLiveNodeDataSupplier() {
-    return new Supplier<JsonElement>() {
-      @Override
-      public JsonElement get() {
-        return new Gson().toJsonTree(amLiveNode);
-      }
-    };
-  }
-
-  private RunningContainers initRunningContainers(ContainerId appMasterContainerId,
-                                                  String appMasterHost) throws Exception {
-    TwillRunResources appMasterResources = new DefaultTwillRunResources(
-      0,
-      appMasterContainerId.toString(),
-      Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
-      Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
-      appMasterHost);
-    String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
-    return new RunningContainers(appId, appMasterResources);
-  }
-
-  private ExpectedContainers initExpectedContainers(TwillSpecification twillSpec) {
-    Map<String, Integer> expectedCounts = Maps.newHashMap();
-    for (RuntimeSpecification runtimeSpec : twillSpec.getRunnables().values()) {
-      expectedCounts.put(runtimeSpec.getName(), runtimeSpec.getResourceSpecification().getInstances());
-    }
-    return new ExpectedContainers(expectedCounts);
-  }
-
-  private void doStart() throws Exception {
-    LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
-
-    // initialize the event handler, if it fails, it will fail the application.
-    eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler()));
-
-    instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
-
-    kafkaServer = new EmbeddedKafkaServer(new File(Constants.Files.KAFKA), generateKafkaConfig());
-
-    // Must start tracker before start AMClient
-    LOG.info("Starting application master tracker server");
-    trackerService.startAndWait();
-    URL trackerUrl = trackerService.getUrl();
-    LOG.info("Started application master tracker server on " + trackerUrl);
-
-    amClient.setTracker(trackerService.getBindAddress(), trackerUrl);
-    amClient.startAndWait();
-
-    // Creates ZK path for runnable and kafka logging service
-    Futures.allAsList(ImmutableList.of(
-      zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
-      zkClient.create("/" + runId.getId() + "/kafka", null, CreateMode.PERSISTENT))
-    ).get();
-
-    // Starts kafka server
-    LOG.info("Starting kafka server");
-
-    kafkaServer.startAndWait();
-    LOG.info("Kafka server started");
-
-    runnableContainerRequests = initContainerRequests();
-  }
-
-  private void doStop() throws Exception {
-    Thread.interrupted();     // This is just to clear the interrupt flag
-
-    LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec));
-
-    try {
-      // call event handler destroy. If there is error, only log and not affected stop sequence.
-      eventHandler.destroy();
-    } catch (Throwable t) {
-      LOG.warn("Exception when calling {}.destroy()", twillSpec.getEventHandler().getClassName(), t);
-    }
-
-    instanceChangeExecutor.shutdownNow();
-
-    // For checking if all containers are stopped.
-    final Set<String> ids = Sets.newHashSet(runningContainers.getContainerIds());
-    YarnAMClient.AllocateHandler handler = new YarnAMClient.AllocateHandler() {
-      @Override
-      public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
-        // no-op
-      }
-
-      @Override
-      public void completed(List<YarnContainerStatus> completed) {
-        for (YarnContainerStatus status : completed) {
-          ids.remove(status.getContainerId());
-        }
-      }
-    };
-
-    runningContainers.stopAll();
-
-    // Poll for 5 seconds to wait for containers to stop.
-    int count = 0;
-    while (!ids.isEmpty() && count++ < 5) {
-      amClient.allocate(0.0f, handler);
-      TimeUnit.SECONDS.sleep(1);
-    }
-
-    LOG.info("Stopping application master tracker server");
-    try {
-      trackerService.stopAndWait();
-      LOG.info("Stopped application master tracker server");
-    } catch (Exception e) {
-      LOG.error("Failed to stop tracker service.", e);
-    } finally {
-      try {
-        // App location cleanup
-        cleanupDir(URI.create(System.getenv(EnvKeys.TWILL_APP_DIR)));
-        Loggings.forceFlush();
-        // Sleep a short while to let kafka clients to have chance to fetch the log
-        TimeUnit.SECONDS.sleep(1);
-      } finally {
-        kafkaServer.stopAndWait();
-        LOG.info("Kafka server stopped");
-      }
-    }
-  }
-
-  private void cleanupDir(URI appDir) {
-    try {
-      if (applicationLocation.delete(true)) {
-        LOG.info("Application directory deleted: {}", appDir);
-      } else {
-        LOG.warn("Failed to cleanup directory {}.", appDir);
-      }
-    } catch (Exception e) {
-      LOG.warn("Exception while cleanup directory {}.", appDir, e);
-    }
-  }
-
-
-  private void doRun() throws Exception {
-    // The main loop
-    Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> currentRequest = null;
-    final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
-
-    YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
-      @Override
-      public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
-        launchRunnable(launchers, provisioning);
-      }
-
-      @Override
-      public void completed(List<YarnContainerStatus> completed) {
-        handleCompleted(completed);
-      }
-    };
-
-    long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
-    while (isRunning()) {
-      // Call allocate. It has to be made at first in order to be able to get cluster resource availability.
-      amClient.allocate(0.0f, allocateHandler);
-
-      // Looks for containers requests.
-      if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && runningContainers.isEmpty()) {
-        LOG.info("All containers completed. Shutting down application master.");
-        break;
-      }
-
-      // If nothing is in provisioning, and no pending request, move to next one
-      while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) {
-        currentRequest = runnableContainerRequests.peek().takeRequest();
-        if (currentRequest == null) {
-          // All different types of resource request from current order is done, move to next one
-          // TODO: Need to handle order type as well
-          runnableContainerRequests.poll();
-        }
-      }
-      // Nothing in provision, makes the next batch of provision request
-      if (provisioning.isEmpty() && currentRequest != null) {
-        addContainerRequests(currentRequest.getKey(), currentRequest.getValue(), provisioning);
-        currentRequest = null;
-      }
-
-      nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
-
-      if (isRunning()) {
-        TimeUnit.SECONDS.sleep(1);
-      }
-    }
-  }
-
-  /**
-   * Handling containers that are completed.
-   */
-  private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
-    Multiset<String> restartRunnables = HashMultiset.create();
-    for (YarnContainerStatus status : completedContainersStatuses) {
-      LOG.info("Container {} completed with {}:{}.",
-               status.getContainerId(), status.getState(), status.getDiagnostics());
-      runningContainers.handleCompleted(status, restartRunnables);
-    }
-
-    for (Multiset.Entry<String> entry : restartRunnables.entrySet()) {
-      LOG.info("Re-request container for {} with {} instances.", entry.getElement(), entry.getCount());
-      for (int i = 0; i < entry.getCount(); i++) {
-        runnableContainerRequests.add(createRunnableContainerRequest(entry.getElement()));
-      }
-    }
-
-    // For all runnables that needs to re-request for containers, update the expected count timestamp
-    // so that the EventHandler would triggered with the right expiration timestamp.
-    expectedContainers.updateRequestTime(restartRunnables.elementSet());
-  }
-
-  /**
-   * Check for containers provision timeout and invoke eventHandler if necessary.
-   *
-   * @return the timestamp for the next time this method needs to be called.
-   */
-  private long checkProvisionTimeout(long nextTimeoutCheck) {
-    if (System.currentTimeMillis() < nextTimeoutCheck) {
-      return nextTimeoutCheck;
-    }
-
-    // Invoke event handler for provision request timeout
-    Map<String, ExpectedContainers.ExpectedCount> expiredRequests = expectedContainers.getAll();
-    Map<String, Integer> runningCounts = runningContainers.countAll();
-
-    List<EventHandler.TimeoutEvent> timeoutEvents = Lists.newArrayList();
-    for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
-      String runnableName = entry.getKey();
-      ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
-      int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
-      if (expectedCount.getCount() != runningCount) {
-        timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(),
-                                                                   runningCount, expectedCount.getTimestamp()));
-      }
-    }
-
-    if (!timeoutEvents.isEmpty()) {
-      try {
-        EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents);
-        if (action.getTimeout() < 0) {
-          // Abort application
-          stop();
-        } else {
-          return nextTimeoutCheck + action.getTimeout();
-        }
-      } catch (Throwable t) {
-        LOG.warn("Exception when calling EventHandler {}. Ignore the result.", t);
-      }
-    }
-    return nextTimeoutCheck + Constants.PROVISION_TIMEOUT;
-  }
-
-  private Credentials createCredentials() {
-    Credentials credentials = new Credentials();
-    if (!UserGroupInformation.isSecurityEnabled()) {
-      return credentials;
-    }
-
-    try {
-      credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
-
-      // Remove the AM->RM tokens
-      Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-      while (iter.hasNext()) {
-        Token<?> token = iter.next();
-        if (token.getKind().equals(AMRM_TOKEN_KIND_NAME)) {
-          iter.remove();
-        }
-      }
-    } catch (IOException e) {
-      LOG.warn("Failed to get current user. No credentials will be provided to containers.", e);
-    }
-
-    return credentials;
-  }
-
-  private Queue<RunnableContainerRequest> initContainerRequests() {
-    // Orderly stores container requests.
-    Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
-    // For each order in the twillSpec, create container request for each runnable.
-    for (TwillSpecification.Order order : twillSpec.getOrders()) {
-      // Group container requests based on resource requirement.
-      ImmutableMultimap.Builder<Resource, RuntimeSpecification> builder = ImmutableMultimap.builder();
-      for (String runnableName : order.getNames()) {
-        RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
-        Resource capability = createCapability(runtimeSpec.getResourceSpecification());
-        builder.put(capability, runtimeSpec);
-      }
-      requests.add(new RunnableContainerRequest(order.getType(), builder.build()));
-    }
-    return requests;
-  }
-
-  /**
-   * Adds container requests with the given resource capability for each runtime.
-   */
-  private void addContainerRequests(Resource capability,
-                                    Collection<RuntimeSpecification> runtimeSpecs,
-                                    Queue<ProvisionRequest> provisioning) {
-    for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
-      String name = runtimeSpec.getName();
-      int newContainers = expectedContainers.getExpected(name) - runningContainers.count(name);
-      if (newContainers > 0) {
-        // TODO: Allow user to set priority?
-        LOG.info("Request {} container with capability {}", newContainers, capability);
-        String requestId = amClient.addContainerRequest(capability, newContainers).setPriority(0).apply();
-        provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
-      }
-    }
-  }
-
-  /**
-   * Launches runnables in the provisioned containers.
-   */
-  private void launchRunnable(List<ProcessLauncher<YarnContainerInfo>> launchers,
-                              Queue<ProvisionRequest> provisioning) {
-    for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
-      LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
-      ProvisionRequest provisionRequest = provisioning.peek();
-      if (provisionRequest == null) {
-        continue;
-      }
-
-      String runnableName = provisionRequest.getRuntimeSpec().getName();
-      LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
-
-      int containerCount = expectedContainers.getExpected(runnableName);
-
-      ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(
-        ImmutableMap.<String, String>builder()
-          .put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR))
-          .put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER))
-          .put(EnvKeys.TWILL_APP_RUN_ID, runId.getId())
-          .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
-          .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
-          .put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect())
-          .build()
-        , getLocalizeFiles(), credentials
-      );
-
-      TwillContainerLauncher launcher = new TwillContainerLauncher(
-        twillSpec.getRunnables().get(runnableName), launchContext,
-        ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
-        containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
-
-      runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
-
-      // Need to call complete to workaround bug in YARN AMRMClient
-      if (provisionRequest.containerAcquired()) {
-        amClient.completeContainerRequest(provisionRequest.getRequestId());
-      }
-
-      if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
-        LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
-        provisioning.poll();
-      }
-    }
-  }
-
-  private List<LocalFile> getLocalizeFiles() {
-    try {
-      Reader reader = Files.newReader(new File(Constants.Files.LOCALIZE_FILES), Charsets.UTF_8);
-      try {
-        return new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
-                                .create().fromJson(reader, new TypeToken<List<LocalFile>>() {}.getType());
-      } finally {
-        reader.close();
-      }
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private String getZKNamespace(String runnableName) {
-    return String.format("/%s/runnables/%s", runId.getId(), runnableName);
-  }
-
-  private String getKafkaZKConnect() {
-    return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
-  }
-
-  private Properties generateKafkaConfig() {
-    int port = Networks.getRandomPort();
-    Preconditions.checkState(port > 0, "Failed to get random port.");
-
-    Properties prop = new Properties();
-    prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
-    prop.setProperty("zk.connect", getKafkaZKConnect());
-    prop.setProperty("num.threads", "8");
-    prop.setProperty("port", Integer.toString(port));
-    prop.setProperty("log.flush.interval", "10000");
-    prop.setProperty("max.socket.request.bytes", "104857600");
-    prop.setProperty("log.cleanup.interval.mins", "1");
-    prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
-    prop.setProperty("zk.connectiontimeout.ms", "1000000");
-    prop.setProperty("socket.receive.buffer", "1048576");
-    prop.setProperty("enable.zookeeper", "true");
-    prop.setProperty("log.retention.hours", "24");
-    prop.setProperty("brokerid", "0");
-    prop.setProperty("socket.send.buffer", "1048576");
-    prop.setProperty("num.partitions", "1");
-    prop.setProperty("log.file.size", "536870912");
-    prop.setProperty("log.default.flush.interval.ms", "1000");
-    return prop;
-  }
-
-  private ListenableFuture<String> processMessage(final String messageId, Message message) {
-    LOG.debug("Message received: {} {}.", messageId, message);
-
-    SettableFuture<String> result = SettableFuture.create();
-    Runnable completion = getMessageCompletion(messageId, result);
-
-    if (handleSecureStoreUpdate(message)) {
-      runningContainers.sendToAll(message, completion);
-      return result;
-    }
-
-    if (handleSetInstances(message, completion)) {
-      return result;
-    }
-
-    // Replicate messages to all runnables
-    if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
-      runningContainers.sendToAll(message, completion);
-      return result;
-    }
-
-    // Replicate message to a particular runnable.
-    if (message.getScope() == Message.Scope.RUNNABLE) {
-      runningContainers.sendToRunnable(message.getRunnableName(), message, completion);
-      return result;
-    }
-
-    LOG.info("Message ignored. {}", message);
-    return Futures.immediateFuture(messageId);
-  }
-
-  /**
-   * Attempts to change the number of running instances.
-   * @return {@code true} if the message does requests for changes in number of running instances of a runnable,
-   *         {@code false} otherwise.
-   */
-  private boolean handleSetInstances(final Message message, final Runnable completion) {
-    if (message.getType() != Message.Type.SYSTEM || message.getScope() != Message.Scope.RUNNABLE) {
-      return false;
-    }
-
-    Command command = message.getCommand();
-    Map<String, String> options = command.getOptions();
-    if (!"instances".equals(command.getCommand()) || !options.containsKey("count")) {
-      return false;
-    }
-
-    final String runnableName = message.getRunnableName();
-    if (runnableName == null || runnableName.isEmpty() || !twillSpec.getRunnables().containsKey(runnableName)) {
-      LOG.info("Unknown runnable {}", runnableName);
-      return false;
-    }
-
-    final int newCount = Integer.parseInt(options.get("count"));
-    final int oldCount = expectedContainers.getExpected(runnableName);
-
-    LOG.info("Received change instances request for {}, from {} to {}.", runnableName, oldCount, newCount);
-
-    if (newCount == oldCount) {   // Nothing to do, simply complete the request.
-      completion.run();
-      return true;
-    }
-
-    instanceChangeExecutor.execute(createSetInstanceRunnable(message, completion, oldCount, newCount));
-    return true;
-  }
-
-  /**
-   * Creates a Runnable for execution of change instance request.
-   */
-  private Runnable createSetInstanceRunnable(final Message message, final Runnable completion,
-                                             final int oldCount, final int newCount) {
-    return new Runnable() {
-      @Override
-      public void run() {
-        final String runnableName = message.getRunnableName();
-
-        LOG.info("Processing change instance request for {}, from {} to {}.", runnableName, oldCount, newCount);
-        try {
-          // Wait until running container count is the same as old count
-          runningContainers.waitForCount(runnableName, oldCount);
-          LOG.info("Confirmed {} containers running for {}.", oldCount, runnableName);
-
-          expectedContainers.setExpected(runnableName, newCount);
-
-          try {
-            if (newCount < oldCount) {
-              // Shutdown some running containers
-              for (int i = 0; i < oldCount - newCount; i++) {
-                runningContainers.removeLast(runnableName);
-              }
-            } else {
-              // Increase the number of instances
-              runnableContainerRequests.add(createRunnableContainerRequest(runnableName));
-            }
-          } finally {
-            runningContainers.sendToRunnable(runnableName, message, completion);
-            LOG.info("Change instances request completed. From {} to {}.", oldCount, newCount);
-          }
-        } catch (InterruptedException e) {
-          // If the wait is being interrupted, discard the message.
-          completion.run();
-        }
-      }
-    };
-  }
-
-  private RunnableContainerRequest createRunnableContainerRequest(final String runnableName) {
-    // Find the current order of the given runnable in order to create a RunnableContainerRequest.
-    TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
-      @Override
-      public boolean apply(TwillSpecification.Order input) {
-        return (input.getNames().contains(runnableName));
-      }
-    });
-
-    RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
-    Resource capability = createCapability(runtimeSpec.getResourceSpecification());
-    return new RunnableContainerRequest(order.getType(), ImmutableMultimap.of(capability, runtimeSpec));
-  }
-
-  private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
-    return new Runnable() {
-      @Override
-      public void run() {
-        future.set(messageId);
-      }
-    };
-  }
-
-  private Resource createCapability(ResourceSpecification resourceSpec) {
-    Resource capability = Records.newRecord(Resource.class);
-
-    if (!YarnUtils.setVirtualCores(capability, resourceSpec.getVirtualCores())) {
-      LOG.debug("Virtual cores limit not supported.");
-    }
-
-    capability.setMemory(resourceSpec.getMemorySize());
-    return capability;
-  }
-
-  @Override
-  protected Service getServiceDelegate() {
-    return serviceDelegate;
-  }
-
-  /**
-   * A private class for service lifecycle. It's done this way so that we can have {@link ZKServiceDecorator} to
-   * wrap around this to reflect status in ZK.
-   */
-  private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
-
-    private volatile Thread runThread;
-
-    @Override
-    protected void run() throws Exception {
-      runThread = Thread.currentThread();
-      try {
-        doRun();
-      } catch (InterruptedException e) {
-        // It's ok to get interrupted exception, as it's a signal to stop
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override
-    protected void startUp() throws Exception {
-      doStart();
-    }
-
-    @Override
-    protected void shutDown() throws Exception {
-      doStop();
-    }
-
-    @Override
-    protected void triggerShutdown() {
-      Thread runThread = this.runThread;
-      if (runThread != null) {
-        runThread.interrupt();
-      }
-    }
-
-    @Override
-    public ListenableFuture<String> onReceived(String messageId, Message message) {
-      return processMessage(messageId, message);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
deleted file mode 100644
index 931c5ef..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.yarn.YarnApplicationReport;
-import org.apache.twill.internal.yarn.YarnLaunchContext;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-/**
- * Interface for submitting a new application to run.
- */
-public interface ApplicationSubmitter {
-
-  ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability);
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
deleted file mode 100644
index 1769910..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.EventHandlerContext;
-import org.apache.twill.api.EventHandlerSpecification;
-
-/**
- *
- */
-final class BasicEventHandlerContext implements EventHandlerContext {
-
-  private final EventHandlerSpecification specification;
-
-  BasicEventHandlerContext(EventHandlerSpecification specification) {
-    this.specification = specification;
-  }
-
-  @Override
-  public EventHandlerSpecification getSpecification() {
-    return specification;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
deleted file mode 100644
index f4ebbd0..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-
-/**
- * This class hold information about the expected container count for each runnable. It also
- * keep track of the timestamp where the expected count has been updated.
- */
-final class ExpectedContainers {
-
-  private final Map<String, ExpectedCount> expectedCounts;
-
-  ExpectedContainers(Map<String, Integer> expected) {
-    expectedCounts = Maps.newHashMap();
-    long now = System.currentTimeMillis();
-
-    for (Map.Entry<String, Integer> entry : expected.entrySet()) {
-      expectedCounts.put(entry.getKey(), new ExpectedCount(entry.getValue(), now));
-    }
-  }
-
-  synchronized void setExpected(String runnable, int expected) {
-    expectedCounts.put(runnable, new ExpectedCount(expected, System.currentTimeMillis()));
-  }
-
-  /**
-   * Updates the ExpectCount timestamp to current time.
-   * @param runnables List of runnable names.
-   */
-  synchronized void updateRequestTime(Iterable<String> runnables) {
-    for (String runnable : runnables) {
-      ExpectedCount oldCount = expectedCounts.get(runnable);
-      expectedCounts.put(runnable, new ExpectedCount(oldCount.getCount(), System.currentTimeMillis()));
-    }
-  }
-
-  synchronized int getExpected(String runnable) {
-    return expectedCounts.get(runnable).getCount();
-  }
-
-  synchronized Map<String, ExpectedCount> getAll() {
-    return ImmutableMap.copyOf(expectedCounts);
-  }
-
-  static final class ExpectedCount {
-    private final int count;
-    private final long timestamp;
-
-    private ExpectedCount(int count, long timestamp) {
-      this.count = count;
-      this.timestamp = timestamp;
-    }
-
-    int getCount() {
-      return count;
-    }
-
-    long getTimestamp() {
-      return timestamp;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
deleted file mode 100644
index 2d41aa6..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.spi.LoggerContextListener;
-
-/**
- *
- */
-abstract class LoggerContextListenerAdapter implements LoggerContextListener {
-
-  private final boolean resetResistant;
-
-  protected LoggerContextListenerAdapter(boolean resetResistant) {
-    this.resetResistant = resetResistant;
-  }
-
-  @Override
-  public final boolean isResetResistant() {
-    return resetResistant;
-  }
-
-  @Override
-  public void onStart(LoggerContext context) {
-  }
-
-  @Override
-  public void onReset(LoggerContext context) {
-  }
-
-  @Override
-  public void onStop(LoggerContext context) {
-  }
-
-  @Override
-  public void onLevelChange(Logger logger, Level level) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
deleted file mode 100644
index 002d2a5..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.RuntimeSpecification;
-
-/**
- * Package private class to help AM to track in progress container request.
- */
-final class ProvisionRequest {
-  private final RuntimeSpecification runtimeSpec;
-  private final String requestId;
-  private int requestCount;
-
-  ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount) {
-    this.runtimeSpec = runtimeSpec;
-    this.requestId = requestId;
-    this.requestCount = requestCount;
-  }
-
-  RuntimeSpecification getRuntimeSpec() {
-    return runtimeSpec;
-  }
-
-  String getRequestId() {
-    return requestId;
-  }
-
-  /**
-   * Called to notify a container has been provision for this request.
-   * @return {@code true} if the requested container count has been provisioned.
-   */
-  boolean containerAcquired() {
-    requestCount--;
-    return requestCount == 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
deleted file mode 100644
index 7f28443..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillSpecification;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Data structure for holding set of runnable specifications based on resource capability.
- */
-final class RunnableContainerRequest {
-  private final TwillSpecification.Order.Type orderType;
-  private final Iterator<Map.Entry<Resource, Collection<RuntimeSpecification>>> requests;
-
-  RunnableContainerRequest(TwillSpecification.Order.Type orderType,
-                           Multimap<Resource, RuntimeSpecification> requests) {
-    this.orderType = orderType;
-    this.requests = requests.asMap().entrySet().iterator();
-  }
-
-  TwillSpecification.Order.Type getOrderType() {
-    return orderType;
-  }
-
-  /**
-   * Remove a resource request and return it.
-   * @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
-   *         {@code null} if there is no more request.
-   */
-  Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> takeRequest() {
-    Map.Entry<Resource, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
-    return next == null ? null : Maps.immutableEntry(next.getKey(), ImmutableList.copyOf(next.getValue()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
deleted file mode 100644
index b4b27a9..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.ProcessController;
-import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
-import org.apache.twill.internal.yarn.YarnContainerInfo;
-import org.apache.twill.internal.yarn.YarnLaunchContext;
-import org.apache.twill.internal.yarn.YarnNMClient;
-import com.google.common.base.Objects;
-import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- *
- */
-public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<YarnContainerInfo> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RunnableProcessLauncher.class);
-
-  private final YarnContainerInfo containerInfo;
-  private final YarnNMClient nmClient;
-  private boolean launched;
-
-  public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmClient) {
-    super(containerInfo);
-    this.containerInfo = containerInfo;
-    this.nmClient = nmClient;
-  }
-
-  @Override
-  public String toString() {
-    return Objects.toStringHelper(this)
-      .add("container", containerInfo)
-      .toString();
-  }
-
-  @Override
-  protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
-    Map<String, String> env = Maps.newHashMap(launchContext.getEnvironment());
-
-    // Set extra environments
-    env.put(EnvKeys.YARN_CONTAINER_ID, containerInfo.getId());
-    env.put(EnvKeys.YARN_CONTAINER_HOST, containerInfo.getHost().getHostName());
-    env.put(EnvKeys.YARN_CONTAINER_PORT, Integer.toString(containerInfo.getPort()));
-    env.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(containerInfo.getMemoryMB()));
-    env.put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(containerInfo.getVirtualCores()));
-
-    launchContext.setEnvironment(env);
-
-    LOG.info("Launching in container {}, {}", containerInfo.getId(), launchContext.getCommands());
-    final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
-    launched = true;
-
-    return new ProcessController<R>() {
-      @Override
-      public R getReport() {
-        // No reporting support for runnable launch yet.
-        return null;
-
-      }
-
-      @Override
-      public void cancel() {
-        cancellable.cancel();
-      }
-    };
-  }
-
-  public boolean isLaunched() {
-    return launched;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
deleted file mode 100644
index beef0d4..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.RunId;
-import org.apache.twill.api.ServiceController;
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.internal.ContainerInfo;
-import org.apache.twill.internal.DefaultResourceReport;
-import org.apache.twill.internal.DefaultTwillRunResources;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.internal.TwillContainerController;
-import org.apache.twill.internal.TwillContainerLauncher;
-import org.apache.twill.internal.container.TwillContainerMain;
-import org.apache.twill.internal.state.Message;
-import org.apache.twill.internal.yarn.YarnContainerStatus;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multiset;
-import com.google.common.collect.Table;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A helper class for ApplicationMasterService to keep track of running containers and to interact
- * with them.
- */
-final class RunningContainers {
-  private static final Logger LOG = LoggerFactory.getLogger(RunningContainers.class);
-
-  /**
-   * Function to return cardinality of a given BitSet.
-   */
-  private static final Function<BitSet, Integer> BITSET_CARDINALITY = new Function<BitSet, Integer>() {
-    @Override
-    public Integer apply(BitSet input) {
-      return input.cardinality();
-    }
-  };
-
-  // Table of <runnableName, containerId, controller>
-  private final Table<String, String, TwillContainerController> containers;
-
-  // Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
-  private final Map<String, BitSet> runnableInstances;
-  private final DefaultResourceReport resourceReport;
-  private final Deque<String> startSequence;
-  private final Lock containerLock;
-  private final Condition containerChange;
-
-  RunningContainers(String appId, TwillRunResources appMasterResources) {
-    containers = HashBasedTable.create();
-    runnableInstances = Maps.newHashMap();
-    startSequence = Lists.newLinkedList();
-    containerLock = new ReentrantLock();
-    containerChange = containerLock.newCondition();
-    resourceReport = new DefaultResourceReport(appId, appMasterResources);
-  }
-
-  /**
-   * Returns {@code true} if there is no live container.
-   */
-  boolean isEmpty() {
-    containerLock.lock();
-    try {
-      return runnableInstances.isEmpty();
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
-    containerLock.lock();
-    try {
-      int instanceId = getStartInstanceId(runnableName);
-      RunId runId = getRunId(runnableName, instanceId);
-      TwillContainerController controller = launcher.start(runId, instanceId,
-                                                           TwillContainerMain.class, "$HADOOP_CONF_DIR");
-      containers.put(runnableName, containerInfo.getId(), controller);
-
-      TwillRunResources resources = new DefaultTwillRunResources(instanceId,
-                                                                 containerInfo.getId(),
-                                                                 containerInfo.getVirtualCores(),
-                                                                 containerInfo.getMemoryMB(),
-                                                                 containerInfo.getHost().getHostName());
-      resourceReport.addRunResources(runnableName, resources);
-
-      if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
-        startSequence.addLast(runnableName);
-      }
-      containerChange.signalAll();
-
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  ResourceReport getResourceReport() {
-    return resourceReport;
-  }
-
-  /**
-   * Stops and removes the last running container of the given runnable.
-   */
-  void removeLast(String runnableName) {
-    containerLock.lock();
-    try {
-      int maxInstanceId = getMaxInstanceId(runnableName);
-      if (maxInstanceId < 0) {
-        LOG.warn("No running container found for {}", runnableName);
-        return;
-      }
-
-      String lastContainerId = null;
-      TwillContainerController lastController = null;
-
-      // Find the controller with the maxInstanceId
-      for (Map.Entry<String, TwillContainerController> entry : containers.row(runnableName).entrySet()) {
-        if (getInstanceId(entry.getValue().getRunId()) == maxInstanceId) {
-          lastContainerId = entry.getKey();
-          lastController = entry.getValue();
-          break;
-        }
-      }
-
-      Preconditions.checkState(lastContainerId != null,
-                               "No container found for {} with instanceId = {}", runnableName, maxInstanceId);
-
-      LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
-      lastController.stopAndWait();
-      containers.remove(runnableName, lastContainerId);
-      removeInstanceId(runnableName, maxInstanceId);
-      resourceReport.removeRunnableResources(runnableName, lastContainerId);
-      containerChange.signalAll();
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  /**
-   * Blocks until there are changes in running containers.
-   */
-  void waitForCount(String runnableName, int count) throws InterruptedException {
-    containerLock.lock();
-    try {
-      while (getRunningInstances(runnableName) != count) {
-        containerChange.await();
-      }
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  /**
-   * Returns the number of running instances of the given runnable.
-   */
-  int count(String runnableName) {
-    containerLock.lock();
-    try {
-      return getRunningInstances(runnableName);
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  /**
-   * Returns a Map contains running instances of all runnables.
-   */
-  Map<String, Integer> countAll() {
-    containerLock.lock();
-    try {
-      return ImmutableMap.copyOf(Maps.transformValues(runnableInstances, BITSET_CARDINALITY));
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  void sendToAll(Message message, Runnable completion) {
-    containerLock.lock();
-    try {
-      if (containers.isEmpty()) {
-        completion.run();
-      }
-
-      // Sends the command to all running containers
-      AtomicInteger count = new AtomicInteger(containers.size());
-      for (Map.Entry<String, Map<String, TwillContainerController>> entry : containers.rowMap().entrySet()) {
-        for (TwillContainerController controller : entry.getValue().values()) {
-          sendMessage(entry.getKey(), message, controller, count, completion);
-        }
-      }
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  void sendToRunnable(String runnableName, Message message, Runnable completion) {
-    containerLock.lock();
-    try {
-      Collection<TwillContainerController> controllers = containers.row(runnableName).values();
-      if (controllers.isEmpty()) {
-        completion.run();
-      }
-
-      AtomicInteger count = new AtomicInteger(controllers.size());
-      for (TwillContainerController controller : controllers) {
-        sendMessage(runnableName, message, controller, count, completion);
-      }
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  /**
-   * Stops all running services. Only called when the AppMaster stops.
-   */
-  void stopAll() {
-    containerLock.lock();
-    try {
-      // Stop it one by one in reverse order of start sequence
-      Iterator<String> itor = startSequence.descendingIterator();
-      List<ListenableFuture<ServiceController.State>> futures = Lists.newLinkedList();
-      while (itor.hasNext()) {
-        String runnableName = itor.next();
-        LOG.info("Stopping all instances of " + runnableName);
-
-        futures.clear();
-        // Parallel stops all running containers of the current runnable.
-        for (TwillContainerController controller : containers.row(runnableName).values()) {
-          futures.add(controller.stop());
-        }
-        // Wait for containers to stop. Assumes the future returned by Futures.successfulAsList won't throw exception.
-        Futures.getUnchecked(Futures.successfulAsList(futures));
-
-        LOG.info("Terminated all instances of " + runnableName);
-      }
-      containers.clear();
-      runnableInstances.clear();
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  Set<String> getContainerIds() {
-    containerLock.lock();
-    try {
-      return ImmutableSet.copyOf(containers.columnKeySet());
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  /**
-   * Handle completion of container.
-   * @param status The completion status.
-   * @param restartRunnables Set of runnable names that requires restart.
-   */
-  void handleCompleted(YarnContainerStatus status, Multiset<String> restartRunnables) {
-    containerLock.lock();
-    String containerId = status.getContainerId();
-    int exitStatus = status.getExitStatus();
-    ContainerState state = status.getState();
-
-    try {
-      Map<String, TwillContainerController> lookup = containers.column(containerId);
-      if (lookup.isEmpty()) {
-        // It's OK because if a container is stopped through removeLast, this would be empty.
-        return;
-      }
-
-      if (lookup.size() != 1) {
-        LOG.warn("More than one controller found for container {}", containerId);
-      }
-
-      if (exitStatus != 0) {
-        LOG.warn("Container {} exited abnormally with state {}, exit code {}. Re-request the container.",
-                 containerId, state, exitStatus);
-        restartRunnables.add(lookup.keySet().iterator().next());
-      } else {
-        LOG.info("Container {} exited normally with state {}", containerId, state);
-      }
-
-      for (Map.Entry<String, TwillContainerController> completedEntry : lookup.entrySet()) {
-        String runnableName = completedEntry.getKey();
-        TwillContainerController controller = completedEntry.getValue();
-        controller.completed(exitStatus);
-
-        removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
-        resourceReport.removeRunnableResources(runnableName, containerId);
-      }
-
-      lookup.clear();
-      containerChange.signalAll();
-    } finally {
-      containerLock.unlock();
-    }
-  }
-
-  /**
-   * Sends a command through the given {@link org.apache.twill.internal.TwillContainerController} of a runnable. Decrements the count
-   * when the sending of command completed. Triggers completion when count reaches zero.
-   */
-  private void sendMessage(final String runnableName, final Message message,
-                           final TwillContainerController controller, final AtomicInteger count,
-                           final Runnable completion) {
-    Futures.addCallback(controller.sendMessage(message), new FutureCallback<Message>() {
-      @Override
-      public void onSuccess(Message result) {
-        if (count.decrementAndGet() == 0) {
-          completion.run();
-        }
-      }
-
-      @Override
-      public void onFailure(Throwable t) {
-        try {
-          LOG.error("Failed to send message. Runnable: {}, RunId: {}, Message: {}.",
-                    runnableName, controller.getRunId(), message, t);
-        } finally {
-          if (count.decrementAndGet() == 0) {
-            completion.run();
-          }
-        }
-      }
-    });
-  }
-
-  /**
-   * Returns the instanceId to start the given runnable.
-   */
-  private int getStartInstanceId(String runnableName) {
-    BitSet instances = runnableInstances.get(runnableName);
-    if (instances == null) {
-      instances = new BitSet();
-      runnableInstances.put(runnableName, instances);
-    }
-    int instanceId = instances.nextClearBit(0);
-    instances.set(instanceId);
-    return instanceId;
-  }
-
-  private void removeInstanceId(String runnableName, int instanceId) {
-    BitSet instances = runnableInstances.get(runnableName);
-    if (instances == null) {
-      return;
-    }
-    instances.clear(instanceId);
-    if (instances.isEmpty()) {
-      runnableInstances.remove(runnableName);
-    }
-  }
-
-  /**
-   * Returns the largest instanceId for the given runnable. Returns -1 if no container is running.
-   */
-  private int getMaxInstanceId(String runnableName) {
-    BitSet instances = runnableInstances.get(runnableName);
-    if (instances == null || instances.isEmpty()) {
-      return -1;
-    }
-    return instances.length() - 1;
-  }
-
-  /**
-   * Returns nnumber of running instances for the given runnable.
-   */
-  private int getRunningInstances(String runableName) {
-    BitSet instances = runnableInstances.get(runableName);
-    return instances == null ? 0 : instances.cardinality();
-  }
-
-  private RunId getRunId(String runnableName, int instanceId) {
-    RunId baseId;
-
-    Collection<TwillContainerController> controllers = containers.row(runnableName).values();
-    if (controllers.isEmpty()) {
-      baseId = RunIds.generate();
-    } else {
-      String id = controllers.iterator().next().getRunId().getId();
-      baseId = RunIds.fromString(id.substring(0, id.lastIndexOf('-')));
-    }
-
-    return RunIds.fromString(baseId.getId() + '-' + instanceId);
-  }
-
-  private int getInstanceId(RunId runId) {
-    String id = runId.getId();
-    return Integer.parseInt(id.substring(id.lastIndexOf('-') + 1));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
deleted file mode 100644
index ca299e0..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.appmaster;
-
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.internal.json.ResourceReportAdapter;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.netty.util.CharsetUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URL;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Webservice that the Application Master will register back to the resource manager
- * for clients to track application progress.  Currently used purely for getting a
- * breakdown of resource usage as a {@link org.apache.twill.api.ResourceReport}.
- */
-public final class TrackerService extends AbstractIdleService {
-
-  // TODO: This is temporary. When support more REST API, this would get moved.
-  public static final String PATH = "/resources";
-
-  private static final Logger LOG  = LoggerFactory.getLogger(TrackerService.class);
-  private static final int NUM_BOSS_THREADS = 1;
-  private static final int CLOSE_CHANNEL_TIMEOUT = 5;
-  private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
-
-  private final String host;
-  private ServerBootstrap bootstrap;
-  private InetSocketAddress bindAddress;
-  private URL url;
-  private final ChannelGroup channelGroup;
-  private final ResourceReport resourceReport;
-
-  /**
-   * Initialize the service.
-   *
-   * @param resourceReport live report that the service will return to clients.
-   * @param appMasterHost the application master host.
-   */
-  public TrackerService(ResourceReport resourceReport, String appMasterHost) {
-    this.channelGroup = new DefaultChannelGroup("appMasterTracker");
-    this.resourceReport = resourceReport;
-    this.host = appMasterHost;
-  }
-
-  /**
-   * Returns the address this tracker service is bounded to.
-   */
-  public InetSocketAddress getBindAddress() {
-    return bindAddress;
-  }
-
-  /**
-   * @return tracker url.
-   */
-  public URL getUrl() {
-    return url;
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
-                                                        new ThreadFactoryBuilder()
-                                                          .setDaemon(true)
-                                                          .setNameFormat("boss-thread")
-                                                          .build());
-
-    Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-                                                             .setDaemon(true)
-                                                             .setNameFormat("worker-thread#%d")
-                                                             .build());
-
-    ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads);
-
-    bootstrap = new ServerBootstrap(factory);
-
-    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
-      public ChannelPipeline getPipeline() {
-        ChannelPipeline pipeline = Channels.pipeline();
-
-        pipeline.addLast("decoder", new HttpRequestDecoder());
-        pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
-        pipeline.addLast("encoder", new HttpResponseEncoder());
-        pipeline.addLast("compressor", new HttpContentCompressor());
-        pipeline.addLast("handler", new ReportHandler(resourceReport));
-
-        return pipeline;
-      }
-    });
-
-    Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
-    bindAddress = (InetSocketAddress) channel.getLocalAddress();
-    url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort()))
-             .resolve(TrackerService.PATH).toURL();
-    channelGroup.add(channel);
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    try {
-      if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) {
-        LOG.warn("Timeout when closing all channels.");
-      }
-    } finally {
-      bootstrap.releaseExternalResources();
-    }
-  }
-
-  /**
-   * Handler to return resources used by this application master, which will be available through
-   * the host and port set when this application master registered itself to the resource manager.
-   */
-  public class ReportHandler extends SimpleChannelUpstreamHandler {
-    private final ResourceReport report;
-    private final ResourceReportAdapter reportAdapter;
-
-    public ReportHandler(ResourceReport report) {
-      this.report = report;
-      this.reportAdapter = ResourceReportAdapter.create();
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
-      HttpRequest request = (HttpRequest) e.getMessage();
-      if (!isValid(request)) {
-        write404(e);
-        return;
-      }
-
-      writeResponse(e);
-    }
-
-    // only accepts GET on /resources for now
-    private boolean isValid(HttpRequest request) {
-      return (request.getMethod() == HttpMethod.GET) && PATH.equals(request.getUri());
-    }
-
-    private void write404(MessageEvent e) {
-      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
-      ChannelFuture future = e.getChannel().write(response);
-      future.addListener(ChannelFutureListener.CLOSE);
-    }
-
-    private void writeResponse(MessageEvent e) {
-      HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-      response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8");
-
-      ChannelBuffer content = ChannelBuffers.dynamicBuffer();
-      Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
-      reportAdapter.toJson(report, writer);
-      try {
-        writer.close();
-      } catch (IOException e1) {
-        LOG.error("error writing resource report", e1);
-      }
-      response.setContent(content);
-      ChannelFuture future = e.getChannel().write(response);
-      future.addListener(ChannelFutureListener.CLOSE);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-      e.getChannel().close();
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/35dfccc4/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
deleted file mode 100644
index bf8e677..0000000
--- a/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package contains implementation of Twill application master.
- */
-package org.apache.twill.internal.appmaster;