You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2013/11/21 22:54:29 UTC
[06/15] Initial import commit.
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
new file mode 100644
index 0000000..b51bb63
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterProcessLauncher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.twill.internal.yarn.YarnUtils;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Map;
+
+/**
+ * A {@link org.apache.twill.internal.ProcessLauncher} for launching Application Master from the client.
+ */
+public final class ApplicationMasterProcessLauncher extends AbstractYarnProcessLauncher<ApplicationId> {
+
+ private final ApplicationSubmitter submitter;
+
+ public ApplicationMasterProcessLauncher(ApplicationId appId, ApplicationSubmitter submitter) {
+ super(appId);
+ this.submitter = submitter;
+ }
+
+ @Override
+ protected boolean useArchiveSuffix() {
+ return false;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
+ final ApplicationId appId = getContainerInfo();
+
+ // Set the resource requirement for AM
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(Constants.APP_MASTER_MEMORY_MB);
+ YarnUtils.setVirtualCores(capability, 1);
+
+ // Put in extra environments
+ Map<String, String> env = ImmutableMap.<String, String>builder()
+ .putAll(launchContext.getEnvironment())
+ .put(EnvKeys.YARN_APP_ID, Integer.toString(appId.getId()))
+ .put(EnvKeys.YARN_APP_ID_CLUSTER_TIME, Long.toString(appId.getClusterTimestamp()))
+ .put(EnvKeys.YARN_APP_ID_STR, appId.toString())
+ .put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(Constants.APP_MASTER_MEMORY_MB))
+ .put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(YarnUtils.getVirtualCores(capability)))
+ .build();
+
+ launchContext.setEnvironment(env);
+ return (ProcessController<R>) submitter.submit(launchContext, capability);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
new file mode 100644
index 0000000..51c8503
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.Command;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.common.Threads;
+import org.apache.twill.filesystem.Location;
+import org.apache.twill.internal.AbstractTwillService;
+import org.apache.twill.internal.Configs;
+import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.DefaultTwillRunResources;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.TwillContainerLauncher;
+import org.apache.twill.internal.ZKServiceDecorator;
+import org.apache.twill.internal.json.LocalFileCodec;
+import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
+import org.apache.twill.internal.logging.Loggings;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.state.MessageCallback;
+import org.apache.twill.internal.utils.Instances;
+import org.apache.twill.internal.utils.Networks;
+import org.apache.twill.internal.yarn.YarnAMClient;
+import org.apache.twill.internal.yarn.YarnAMClientFactory;
+import org.apache.twill.internal.yarn.YarnContainerInfo;
+import org.apache.twill.internal.yarn.YarnContainerStatus;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClients;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
+import com.google.common.io.CharStreams;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public final class ApplicationMasterService extends AbstractTwillService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
+
+ // Copied from org.apache.hadoop.yarn.security.AMRMTokenIdentifier.KIND_NAME since it's missing in Hadoop-2.0
+ private static final Text AMRM_TOKEN_KIND_NAME = new Text("YARN_AM_RM_TOKEN");
+
+ private final RunId runId;
+ private final ZKClient zkClient;
+ private final TwillSpecification twillSpec;
+ private final ApplicationMasterLiveNodeData amLiveNode;
+ private final ZKServiceDecorator serviceDelegate;
+ private final RunningContainers runningContainers;
+ private final ExpectedContainers expectedContainers;
+ private final TrackerService trackerService;
+ private final YarnAMClient amClient;
+ private final String jvmOpts;
+ private final int reservedMemory;
+ private final EventHandler eventHandler;
+ private final Location applicationLocation;
+
+ private EmbeddedKafkaServer kafkaServer;
+ private Queue<RunnableContainerRequest> runnableContainerRequests;
+ private ExecutorService instanceChangeExecutor;
+
+ public ApplicationMasterService(RunId runId, ZKClient zkClient, File twillSpecFile,
+ YarnAMClientFactory amClientFactory, Location applicationLocation) throws Exception {
+ super(applicationLocation);
+
+ this.runId = runId;
+ this.twillSpec = TwillSpecificationAdapter.create().fromJson(twillSpecFile);
+ this.zkClient = zkClient;
+ this.applicationLocation = applicationLocation;
+ this.amClient = amClientFactory.create();
+ this.credentials = createCredentials();
+ this.jvmOpts = loadJvmOptions();
+ this.reservedMemory = getReservedMemory();
+
+ amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
+ Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
+ amClient.getContainerId().toString());
+
+ serviceDelegate = new ZKServiceDecorator(zkClient, runId, createLiveNodeDataSupplier(),
+ new ServiceDelegate(), new Runnable() {
+ @Override
+ public void run() {
+ amClient.stopAndWait();
+ }
+ });
+ expectedContainers = initExpectedContainers(twillSpec);
+ runningContainers = initRunningContainers(amClient.getContainerId(), amClient.getHost());
+ trackerService = new TrackerService(runningContainers.getResourceReport(), amClient.getHost());
+ eventHandler = createEventHandler(twillSpec);
+ }
+
+ private String loadJvmOptions() throws IOException {
+ final File jvmOptsFile = new File(Constants.Files.JVM_OPTIONS);
+ if (!jvmOptsFile.exists()) {
+ return "";
+ }
+
+ return CharStreams.toString(new InputSupplier<Reader>() {
+ @Override
+ public Reader getInput() throws IOException {
+ return new FileReader(jvmOptsFile);
+ }
+ });
+ }
+
+ private int getReservedMemory() {
+ String value = System.getenv(EnvKeys.TWILL_RESERVED_MEMORY_MB);
+ if (value == null) {
+ return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
+ }
+ try {
+ return Integer.parseInt(value);
+ } catch (Exception e) {
+ return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
+ }
+ }
+
+ private EventHandler createEventHandler(TwillSpecification twillSpec) {
+ try {
+ // Should be able to load by this class ClassLoader, as they packaged in the same jar.
+ EventHandlerSpecification handlerSpec = twillSpec.getEventHandler();
+
+ Class<?> handlerClass = getClass().getClassLoader().loadClass(handlerSpec.getClassName());
+ Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
+ "Class {} does not implements {}",
+ handlerClass, EventHandler.class.getName());
+ return Instances.newInstance((Class<? extends EventHandler>) handlerClass);
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private Supplier<? extends JsonElement> createLiveNodeDataSupplier() {
+ return new Supplier<JsonElement>() {
+ @Override
+ public JsonElement get() {
+ return new Gson().toJsonTree(amLiveNode);
+ }
+ };
+ }
+
+ private RunningContainers initRunningContainers(ContainerId appMasterContainerId,
+ String appMasterHost) throws Exception {
+ TwillRunResources appMasterResources = new DefaultTwillRunResources(
+ 0,
+ appMasterContainerId.toString(),
+ Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES)),
+ Integer.parseInt(System.getenv(EnvKeys.YARN_CONTAINER_MEMORY_MB)),
+ appMasterHost);
+ String appId = appMasterContainerId.getApplicationAttemptId().getApplicationId().toString();
+ return new RunningContainers(appId, appMasterResources);
+ }
+
+ private ExpectedContainers initExpectedContainers(TwillSpecification twillSpec) {
+ Map<String, Integer> expectedCounts = Maps.newHashMap();
+ for (RuntimeSpecification runtimeSpec : twillSpec.getRunnables().values()) {
+ expectedCounts.put(runtimeSpec.getName(), runtimeSpec.getResourceSpecification().getInstances());
+ }
+ return new ExpectedContainers(expectedCounts);
+ }
+
+ private void doStart() throws Exception {
+ LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec));
+
+ // initialize the event handler, if it fails, it will fail the application.
+ eventHandler.initialize(new BasicEventHandlerContext(twillSpec.getEventHandler()));
+
+ instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
+
+ kafkaServer = new EmbeddedKafkaServer(new File(Constants.Files.KAFKA), generateKafkaConfig());
+
+ // Must start tracker before start AMClient
+ LOG.info("Starting application master tracker server");
+ trackerService.startAndWait();
+ URL trackerUrl = trackerService.getUrl();
+ LOG.info("Started application master tracker server on " + trackerUrl);
+
+ amClient.setTracker(trackerService.getBindAddress(), trackerUrl);
+ amClient.startAndWait();
+
+ // Creates ZK path for runnable and kafka logging service
+ Futures.allAsList(ImmutableList.of(
+ zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
+ zkClient.create("/" + runId.getId() + "/kafka", null, CreateMode.PERSISTENT))
+ ).get();
+
+ // Starts kafka server
+ LOG.info("Starting kafka server");
+
+ kafkaServer.startAndWait();
+ LOG.info("Kafka server started");
+
+ runnableContainerRequests = initContainerRequests();
+ }
+
+ private void doStop() throws Exception {
+ Thread.interrupted(); // This is just to clear the interrupt flag
+
+ LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec));
+
+ try {
+ // call event handler destroy. If there is error, only log and not affected stop sequence.
+ eventHandler.destroy();
+ } catch (Throwable t) {
+ LOG.warn("Exception when calling {}.destroy()", twillSpec.getEventHandler().getClassName(), t);
+ }
+
+ instanceChangeExecutor.shutdownNow();
+
+ // For checking if all containers are stopped.
+ final Set<String> ids = Sets.newHashSet(runningContainers.getContainerIds());
+ YarnAMClient.AllocateHandler handler = new YarnAMClient.AllocateHandler() {
+ @Override
+ public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+ // no-op
+ }
+
+ @Override
+ public void completed(List<YarnContainerStatus> completed) {
+ for (YarnContainerStatus status : completed) {
+ ids.remove(status.getContainerId());
+ }
+ }
+ };
+
+ runningContainers.stopAll();
+
+ // Poll for 5 seconds to wait for containers to stop.
+ int count = 0;
+ while (!ids.isEmpty() && count++ < 5) {
+ amClient.allocate(0.0f, handler);
+ TimeUnit.SECONDS.sleep(1);
+ }
+
+ LOG.info("Stopping application master tracker server");
+ try {
+ trackerService.stopAndWait();
+ LOG.info("Stopped application master tracker server");
+ } catch (Exception e) {
+ LOG.error("Failed to stop tracker service.", e);
+ } finally {
+ try {
+ // App location cleanup
+ cleanupDir(URI.create(System.getenv(EnvKeys.TWILL_APP_DIR)));
+ Loggings.forceFlush();
+ // Sleep a short while to let kafka clients to have chance to fetch the log
+ TimeUnit.SECONDS.sleep(1);
+ } finally {
+ kafkaServer.stopAndWait();
+ LOG.info("Kafka server stopped");
+ }
+ }
+ }
+
+ private void cleanupDir(URI appDir) {
+ try {
+ if (applicationLocation.delete(true)) {
+ LOG.info("Application directory deleted: {}", appDir);
+ } else {
+ LOG.warn("Failed to cleanup directory {}.", appDir);
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while cleanup directory {}.", appDir, e);
+ }
+ }
+
+
+ private void doRun() throws Exception {
+ // The main loop
+ Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> currentRequest = null;
+ final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
+
+ YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
+ @Override
+ public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+ launchRunnable(launchers, provisioning);
+ }
+
+ @Override
+ public void completed(List<YarnContainerStatus> completed) {
+ handleCompleted(completed);
+ }
+ };
+
+ long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
+ while (isRunning()) {
+ // Call allocate. It has to be made at first in order to be able to get cluster resource availability.
+ amClient.allocate(0.0f, allocateHandler);
+
+ // Looks for containers requests.
+ if (provisioning.isEmpty() && runnableContainerRequests.isEmpty() && runningContainers.isEmpty()) {
+ LOG.info("All containers completed. Shutting down application master.");
+ break;
+ }
+
+ // If nothing is in provisioning, and no pending request, move to next one
+ while (provisioning.isEmpty() && currentRequest == null && !runnableContainerRequests.isEmpty()) {
+ currentRequest = runnableContainerRequests.peek().takeRequest();
+ if (currentRequest == null) {
+ // All different types of resource request from current order is done, move to next one
+ // TODO: Need to handle order type as well
+ runnableContainerRequests.poll();
+ }
+ }
+ // Nothing in provision, makes the next batch of provision request
+ if (provisioning.isEmpty() && currentRequest != null) {
+ addContainerRequests(currentRequest.getKey(), currentRequest.getValue(), provisioning);
+ currentRequest = null;
+ }
+
+ nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
+
+ if (isRunning()) {
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ }
+
+ /**
+ * Handling containers that are completed.
+ */
+ private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
+ Multiset<String> restartRunnables = HashMultiset.create();
+ for (YarnContainerStatus status : completedContainersStatuses) {
+ LOG.info("Container {} completed with {}:{}.",
+ status.getContainerId(), status.getState(), status.getDiagnostics());
+ runningContainers.handleCompleted(status, restartRunnables);
+ }
+
+ for (Multiset.Entry<String> entry : restartRunnables.entrySet()) {
+ LOG.info("Re-request container for {} with {} instances.", entry.getElement(), entry.getCount());
+ for (int i = 0; i < entry.getCount(); i++) {
+ runnableContainerRequests.add(createRunnableContainerRequest(entry.getElement()));
+ }
+ }
+
+ // For all runnables that needs to re-request for containers, update the expected count timestamp
+ // so that the EventHandler would triggered with the right expiration timestamp.
+ expectedContainers.updateRequestTime(restartRunnables.elementSet());
+ }
+
+ /**
+ * Check for containers provision timeout and invoke eventHandler if necessary.
+ *
+ * @return the timestamp for the next time this method needs to be called.
+ */
+ private long checkProvisionTimeout(long nextTimeoutCheck) {
+ if (System.currentTimeMillis() < nextTimeoutCheck) {
+ return nextTimeoutCheck;
+ }
+
+ // Invoke event handler for provision request timeout
+ Map<String, ExpectedContainers.ExpectedCount> expiredRequests = expectedContainers.getAll();
+ Map<String, Integer> runningCounts = runningContainers.countAll();
+
+ List<EventHandler.TimeoutEvent> timeoutEvents = Lists.newArrayList();
+ for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
+ String runnableName = entry.getKey();
+ ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
+ int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
+ if (expectedCount.getCount() != runningCount) {
+ timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(),
+ runningCount, expectedCount.getTimestamp()));
+ }
+ }
+
+ if (!timeoutEvents.isEmpty()) {
+ try {
+ EventHandler.TimeoutAction action = eventHandler.launchTimeout(timeoutEvents);
+ if (action.getTimeout() < 0) {
+ // Abort application
+ stop();
+ } else {
+ return nextTimeoutCheck + action.getTimeout();
+ }
+ } catch (Throwable t) {
+ LOG.warn("Exception when calling EventHandler {}. Ignore the result.", t);
+ }
+ }
+ return nextTimeoutCheck + Constants.PROVISION_TIMEOUT;
+ }
+
+ private Credentials createCredentials() {
+ Credentials credentials = new Credentials();
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return credentials;
+ }
+
+ try {
+ credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
+
+ // Remove the AM->RM tokens
+ Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<?> token = iter.next();
+ if (token.getKind().equals(AMRM_TOKEN_KIND_NAME)) {
+ iter.remove();
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to get current user. No credentials will be provided to containers.", e);
+ }
+
+ return credentials;
+ }
+
+ private Queue<RunnableContainerRequest> initContainerRequests() {
+ // Orderly stores container requests.
+ Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
+ // For each order in the twillSpec, create container request for each runnable.
+ for (TwillSpecification.Order order : twillSpec.getOrders()) {
+ // Group container requests based on resource requirement.
+ ImmutableMultimap.Builder<Resource, RuntimeSpecification> builder = ImmutableMultimap.builder();
+ for (String runnableName : order.getNames()) {
+ RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+ Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+ builder.put(capability, runtimeSpec);
+ }
+ requests.add(new RunnableContainerRequest(order.getType(), builder.build()));
+ }
+ return requests;
+ }
+
+ /**
+ * Adds container requests with the given resource capability for each runtime.
+ */
+ private void addContainerRequests(Resource capability,
+ Collection<RuntimeSpecification> runtimeSpecs,
+ Queue<ProvisionRequest> provisioning) {
+ for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
+ String name = runtimeSpec.getName();
+ int newContainers = expectedContainers.getExpected(name) - runningContainers.count(name);
+ if (newContainers > 0) {
+ // TODO: Allow user to set priority?
+ LOG.info("Request {} container with capability {}", newContainers, capability);
+ String requestId = amClient.addContainerRequest(capability, newContainers).setPriority(0).apply();
+ provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
+ }
+ }
+ }
+
+ /**
+ * Launches runnables in the provisioned containers.
+ */
+ private void launchRunnable(List<ProcessLauncher<YarnContainerInfo>> launchers,
+ Queue<ProvisionRequest> provisioning) {
+ for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
+ LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
+ ProvisionRequest provisionRequest = provisioning.peek();
+ if (provisionRequest == null) {
+ continue;
+ }
+
+ String runnableName = provisionRequest.getRuntimeSpec().getName();
+ LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
+
+ int containerCount = expectedContainers.getExpected(runnableName);
+
+ ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(
+ ImmutableMap.<String, String>builder()
+ .put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR))
+ .put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER))
+ .put(EnvKeys.TWILL_APP_RUN_ID, runId.getId())
+ .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
+ .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString())
+ .put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect())
+ .build()
+ , getLocalizeFiles(), credentials
+ );
+
+ TwillContainerLauncher launcher = new TwillContainerLauncher(
+ twillSpec.getRunnables().get(runnableName), launchContext,
+ ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
+ containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
+
+ runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher);
+
+ // Need to call complete to workaround bug in YARN AMRMClient
+ if (provisionRequest.containerAcquired()) {
+ amClient.completeContainerRequest(provisionRequest.getRequestId());
+ }
+
+ if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
+ LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
+ provisioning.poll();
+ }
+ }
+ }
+
+ private List<LocalFile> getLocalizeFiles() {
+ try {
+ Reader reader = Files.newReader(new File(Constants.Files.LOCALIZE_FILES), Charsets.UTF_8);
+ try {
+ return new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+ .create().fromJson(reader, new TypeToken<List<LocalFile>>() {}.getType());
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private String getZKNamespace(String runnableName) {
+ return String.format("/%s/runnables/%s", runId.getId(), runnableName);
+ }
+
+ private String getKafkaZKConnect() {
+ return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId());
+ }
+
+ private Properties generateKafkaConfig() {
+ int port = Networks.getRandomPort();
+ Preconditions.checkState(port > 0, "Failed to get random port.");
+
+ Properties prop = new Properties();
+ prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
+ prop.setProperty("zk.connect", getKafkaZKConnect());
+ prop.setProperty("num.threads", "8");
+ prop.setProperty("port", Integer.toString(port));
+ prop.setProperty("log.flush.interval", "10000");
+ prop.setProperty("max.socket.request.bytes", "104857600");
+ prop.setProperty("log.cleanup.interval.mins", "1");
+ prop.setProperty("log.default.flush.scheduler.interval.ms", "1000");
+ prop.setProperty("zk.connectiontimeout.ms", "1000000");
+ prop.setProperty("socket.receive.buffer", "1048576");
+ prop.setProperty("enable.zookeeper", "true");
+ prop.setProperty("log.retention.hours", "24");
+ prop.setProperty("brokerid", "0");
+ prop.setProperty("socket.send.buffer", "1048576");
+ prop.setProperty("num.partitions", "1");
+ prop.setProperty("log.file.size", "536870912");
+ prop.setProperty("log.default.flush.interval.ms", "1000");
+ return prop;
+ }
+
+ private ListenableFuture<String> processMessage(final String messageId, Message message) {
+ LOG.debug("Message received: {} {}.", messageId, message);
+
+ SettableFuture<String> result = SettableFuture.create();
+ Runnable completion = getMessageCompletion(messageId, result);
+
+ if (handleSecureStoreUpdate(message)) {
+ runningContainers.sendToAll(message, completion);
+ return result;
+ }
+
+ if (handleSetInstances(message, completion)) {
+ return result;
+ }
+
+ // Replicate messages to all runnables
+ if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
+ runningContainers.sendToAll(message, completion);
+ return result;
+ }
+
+ // Replicate message to a particular runnable.
+ if (message.getScope() == Message.Scope.RUNNABLE) {
+ runningContainers.sendToRunnable(message.getRunnableName(), message, completion);
+ return result;
+ }
+
+ LOG.info("Message ignored. {}", message);
+ return Futures.immediateFuture(messageId);
+ }
+
+ /**
+ * Attempts to change the number of running instances.
+ * @return {@code true} if the message does requests for changes in number of running instances of a runnable,
+ * {@code false} otherwise.
+ */
+ private boolean handleSetInstances(final Message message, final Runnable completion) {
+ if (message.getType() != Message.Type.SYSTEM || message.getScope() != Message.Scope.RUNNABLE) {
+ return false;
+ }
+
+ Command command = message.getCommand();
+ Map<String, String> options = command.getOptions();
+ if (!"instances".equals(command.getCommand()) || !options.containsKey("count")) {
+ return false;
+ }
+
+ final String runnableName = message.getRunnableName();
+ if (runnableName == null || runnableName.isEmpty() || !twillSpec.getRunnables().containsKey(runnableName)) {
+ LOG.info("Unknown runnable {}", runnableName);
+ return false;
+ }
+
+ final int newCount = Integer.parseInt(options.get("count"));
+ final int oldCount = expectedContainers.getExpected(runnableName);
+
+ LOG.info("Received change instances request for {}, from {} to {}.", runnableName, oldCount, newCount);
+
+ if (newCount == oldCount) { // Nothing to do, simply complete the request.
+ completion.run();
+ return true;
+ }
+
+ instanceChangeExecutor.execute(createSetInstanceRunnable(message, completion, oldCount, newCount));
+ return true;
+ }
+
+ /**
+ * Creates a Runnable for execution of change instance request.
+ */
+ private Runnable createSetInstanceRunnable(final Message message, final Runnable completion,
+ final int oldCount, final int newCount) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ final String runnableName = message.getRunnableName();
+
+ LOG.info("Processing change instance request for {}, from {} to {}.", runnableName, oldCount, newCount);
+ try {
+ // Wait until running container count is the same as old count
+ runningContainers.waitForCount(runnableName, oldCount);
+ LOG.info("Confirmed {} containers running for {}.", oldCount, runnableName);
+
+ expectedContainers.setExpected(runnableName, newCount);
+
+ try {
+ if (newCount < oldCount) {
+ // Shutdown some running containers
+ for (int i = 0; i < oldCount - newCount; i++) {
+ runningContainers.removeLast(runnableName);
+ }
+ } else {
+ // Increase the number of instances
+ runnableContainerRequests.add(createRunnableContainerRequest(runnableName));
+ }
+ } finally {
+ runningContainers.sendToRunnable(runnableName, message, completion);
+ LOG.info("Change instances request completed. From {} to {}.", oldCount, newCount);
+ }
+ } catch (InterruptedException e) {
+ // If the wait is being interrupted, discard the message.
+ completion.run();
+ }
+ }
+ };
+ }
+
+ private RunnableContainerRequest createRunnableContainerRequest(final String runnableName) {
+ // Find the current order of the given runnable in order to create a RunnableContainerRequest.
+ TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
+ @Override
+ public boolean apply(TwillSpecification.Order input) {
+ return (input.getNames().contains(runnableName));
+ }
+ });
+
+ RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+ Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+ return new RunnableContainerRequest(order.getType(), ImmutableMultimap.of(capability, runtimeSpec));
+ }
+
+ private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ future.set(messageId);
+ }
+ };
+ }
+
+ private Resource createCapability(ResourceSpecification resourceSpec) {
+ Resource capability = Records.newRecord(Resource.class);
+
+ if (!YarnUtils.setVirtualCores(capability, resourceSpec.getVirtualCores())) {
+ LOG.debug("Virtual cores limit not supported.");
+ }
+
+ capability.setMemory(resourceSpec.getMemorySize());
+ return capability;
+ }
+
+ @Override
+ protected Service getServiceDelegate() {
+ return serviceDelegate;
+ }
+
+ /**
+ * A private class for service lifecycle. It's done this way so that we can have {@link ZKServiceDecorator} to
+ * wrap around this to reflect status in ZK.
+ */
+ private final class ServiceDelegate extends AbstractExecutionThreadService implements MessageCallback {
+
+ private volatile Thread runThread;
+
+ @Override
+ protected void run() throws Exception {
+ runThread = Thread.currentThread();
+ try {
+ doRun();
+ } catch (InterruptedException e) {
+ // It's ok to get interrupted exception, as it's a signal to stop
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ doStart();
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ doStop();
+ }
+
+ @Override
+ protected void triggerShutdown() {
+ Thread runThread = this.runThread;
+ if (runThread != null) {
+ runThread.interrupt();
+ }
+ }
+
+ @Override
+ public ListenableFuture<String> onReceived(String messageId, Message message) {
+ return processMessage(messageId, message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
new file mode 100644
index 0000000..931c5ef
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationSubmitter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.YarnApplicationReport;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * Interface for submitting a new application to run.
+ */
+public interface ApplicationSubmitter {
+
+ ProcessController<YarnApplicationReport> submit(YarnLaunchContext launchContext, Resource capability);
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
new file mode 100644
index 0000000..1769910
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/BasicEventHandlerContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.EventHandlerContext;
+import org.apache.twill.api.EventHandlerSpecification;
+
+/**
+ *
+ */
+final class BasicEventHandlerContext implements EventHandlerContext {
+
+ private final EventHandlerSpecification specification;
+
+ BasicEventHandlerContext(EventHandlerSpecification specification) {
+ this.specification = specification;
+ }
+
+ @Override
+ public EventHandlerSpecification getSpecification() {
+ return specification;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
new file mode 100644
index 0000000..f4ebbd0
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/ExpectedContainers.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+/**
+ * This class hold information about the expected container count for each runnable. It also
+ * keep track of the timestamp where the expected count has been updated.
+ */
+final class ExpectedContainers {
+
+ private final Map<String, ExpectedCount> expectedCounts;
+
+ ExpectedContainers(Map<String, Integer> expected) {
+ expectedCounts = Maps.newHashMap();
+ long now = System.currentTimeMillis();
+
+ for (Map.Entry<String, Integer> entry : expected.entrySet()) {
+ expectedCounts.put(entry.getKey(), new ExpectedCount(entry.getValue(), now));
+ }
+ }
+
+ synchronized void setExpected(String runnable, int expected) {
+ expectedCounts.put(runnable, new ExpectedCount(expected, System.currentTimeMillis()));
+ }
+
+ /**
+ * Updates the ExpectCount timestamp to current time.
+ * @param runnables List of runnable names.
+ */
+ synchronized void updateRequestTime(Iterable<String> runnables) {
+ for (String runnable : runnables) {
+ ExpectedCount oldCount = expectedCounts.get(runnable);
+ expectedCounts.put(runnable, new ExpectedCount(oldCount.getCount(), System.currentTimeMillis()));
+ }
+ }
+
+ synchronized int getExpected(String runnable) {
+ return expectedCounts.get(runnable).getCount();
+ }
+
+ synchronized Map<String, ExpectedCount> getAll() {
+ return ImmutableMap.copyOf(expectedCounts);
+ }
+
+ static final class ExpectedCount {
+ private final int count;
+ private final long timestamp;
+
+ private ExpectedCount(int count, long timestamp) {
+ this.count = count;
+ this.timestamp = timestamp;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
new file mode 100644
index 0000000..2d41aa6
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/LoggerContextListenerAdapter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.LoggerContextListener;
+
+/**
+ *
+ */
+abstract class LoggerContextListenerAdapter implements LoggerContextListener {
+
+ private final boolean resetResistant;
+
+ protected LoggerContextListenerAdapter(boolean resetResistant) {
+ this.resetResistant = resetResistant;
+ }
+
+ @Override
+ public final boolean isResetResistant() {
+ return resetResistant;
+ }
+
+ @Override
+ public void onStart(LoggerContext context) {
+ }
+
+ @Override
+ public void onReset(LoggerContext context) {
+ }
+
+ @Override
+ public void onStop(LoggerContext context) {
+ }
+
+ @Override
+ public void onLevelChange(Logger logger, Level level) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
new file mode 100644
index 0000000..002d2a5
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.RuntimeSpecification;
+
+/**
+ * Package private class to help AM to track in progress container request.
+ */
+final class ProvisionRequest {
+ private final RuntimeSpecification runtimeSpec;
+ private final String requestId;
+ private int requestCount;
+
+ ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount) {
+ this.runtimeSpec = runtimeSpec;
+ this.requestId = requestId;
+ this.requestCount = requestCount;
+ }
+
+ RuntimeSpecification getRuntimeSpec() {
+ return runtimeSpec;
+ }
+
+ String getRequestId() {
+ return requestId;
+ }
+
+ /**
+ * Called to notify a container has been provision for this request.
+ * @return {@code true} if the requested container count has been provisioned.
+ */
+ boolean containerAcquired() {
+ requestCount--;
+ return requestCount == 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
new file mode 100644
index 0000000..7f28443
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillSpecification;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Data structure for holding set of runnable specifications based on resource capability.
+ */
+final class RunnableContainerRequest {
+ private final TwillSpecification.Order.Type orderType;
+ private final Iterator<Map.Entry<Resource, Collection<RuntimeSpecification>>> requests;
+
+ RunnableContainerRequest(TwillSpecification.Order.Type orderType,
+ Multimap<Resource, RuntimeSpecification> requests) {
+ this.orderType = orderType;
+ this.requests = requests.asMap().entrySet().iterator();
+ }
+
+ TwillSpecification.Order.Type getOrderType() {
+ return orderType;
+ }
+
+ /**
+ * Remove a resource request and return it.
+ * @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
+ * {@code null} if there is no more request.
+ */
+ Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> takeRequest() {
+ Map.Entry<Resource, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
+ return next == null ? null : Maps.immutableEntry(next.getKey(), ImmutableList.copyOf(next.getValue()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
new file mode 100644
index 0000000..b4b27a9
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.internal.EnvKeys;
+import org.apache.twill.internal.ProcessController;
+import org.apache.twill.internal.yarn.AbstractYarnProcessLauncher;
+import org.apache.twill.internal.yarn.YarnContainerInfo;
+import org.apache.twill.internal.yarn.YarnLaunchContext;
+import org.apache.twill.internal.yarn.YarnNMClient;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<YarnContainerInfo> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RunnableProcessLauncher.class);
+
+ private final YarnContainerInfo containerInfo;
+ private final YarnNMClient nmClient;
+ private boolean launched;
+
+ public RunnableProcessLauncher(YarnContainerInfo containerInfo, YarnNMClient nmClient) {
+ super(containerInfo);
+ this.containerInfo = containerInfo;
+ this.nmClient = nmClient;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("container", containerInfo)
+ .toString();
+ }
+
+ @Override
+ protected <R> ProcessController<R> doLaunch(YarnLaunchContext launchContext) {
+ Map<String, String> env = Maps.newHashMap(launchContext.getEnvironment());
+
+ // Set extra environments
+ env.put(EnvKeys.YARN_CONTAINER_ID, containerInfo.getId());
+ env.put(EnvKeys.YARN_CONTAINER_HOST, containerInfo.getHost().getHostName());
+ env.put(EnvKeys.YARN_CONTAINER_PORT, Integer.toString(containerInfo.getPort()));
+ env.put(EnvKeys.YARN_CONTAINER_MEMORY_MB, Integer.toString(containerInfo.getMemoryMB()));
+ env.put(EnvKeys.YARN_CONTAINER_VIRTUAL_CORES, Integer.toString(containerInfo.getVirtualCores()));
+
+ launchContext.setEnvironment(env);
+
+ LOG.info("Launching in container {}, {}", containerInfo.getId(), launchContext.getCommands());
+ final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
+ launched = true;
+
+ return new ProcessController<R>() {
+ @Override
+ public R getReport() {
+ // No reporting support for runnable launch yet.
+ return null;
+
+ }
+
+ @Override
+ public void cancel() {
+ cancellable.cancel();
+ }
+ };
+ }
+
+ public boolean isLaunched() {
+ return launched;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
new file mode 100644
index 0000000..beef0d4
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.ServiceController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.internal.ContainerInfo;
+import org.apache.twill.internal.DefaultResourceReport;
+import org.apache.twill.internal.DefaultTwillRunResources;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.TwillContainerController;
+import org.apache.twill.internal.TwillContainerLauncher;
+import org.apache.twill.internal.container.TwillContainerMain;
+import org.apache.twill.internal.state.Message;
+import org.apache.twill.internal.yarn.YarnContainerStatus;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Table;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A helper class for ApplicationMasterService to keep track of running containers and to interact
+ * with them.
+ */
+final class RunningContainers {
+ private static final Logger LOG = LoggerFactory.getLogger(RunningContainers.class);
+
+ /**
+ * Function to return cardinality of a given BitSet.
+ */
+ private static final Function<BitSet, Integer> BITSET_CARDINALITY = new Function<BitSet, Integer>() {
+ @Override
+ public Integer apply(BitSet input) {
+ return input.cardinality();
+ }
+ };
+
+ // Table of <runnableName, containerId, controller>
+ private final Table<String, String, TwillContainerController> containers;
+
+ // Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
+ private final Map<String, BitSet> runnableInstances;
+ private final DefaultResourceReport resourceReport;
+ private final Deque<String> startSequence;
+ private final Lock containerLock;
+ private final Condition containerChange;
+
+ RunningContainers(String appId, TwillRunResources appMasterResources) {
+ containers = HashBasedTable.create();
+ runnableInstances = Maps.newHashMap();
+ startSequence = Lists.newLinkedList();
+ containerLock = new ReentrantLock();
+ containerChange = containerLock.newCondition();
+ resourceReport = new DefaultResourceReport(appId, appMasterResources);
+ }
+
+ /**
+ * Returns {@code true} if there is no live container.
+ */
+ boolean isEmpty() {
+ containerLock.lock();
+ try {
+ return runnableInstances.isEmpty();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) {
+ containerLock.lock();
+ try {
+ int instanceId = getStartInstanceId(runnableName);
+ RunId runId = getRunId(runnableName, instanceId);
+ TwillContainerController controller = launcher.start(runId, instanceId,
+ TwillContainerMain.class, "$HADOOP_CONF_DIR");
+ containers.put(runnableName, containerInfo.getId(), controller);
+
+ TwillRunResources resources = new DefaultTwillRunResources(instanceId,
+ containerInfo.getId(),
+ containerInfo.getVirtualCores(),
+ containerInfo.getMemoryMB(),
+ containerInfo.getHost().getHostName());
+ resourceReport.addRunResources(runnableName, resources);
+
+ if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
+ startSequence.addLast(runnableName);
+ }
+ containerChange.signalAll();
+
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ ResourceReport getResourceReport() {
+ return resourceReport;
+ }
+
+ /**
+ * Stops and removes the last running container of the given runnable.
+ */
+ void removeLast(String runnableName) {
+ containerLock.lock();
+ try {
+ int maxInstanceId = getMaxInstanceId(runnableName);
+ if (maxInstanceId < 0) {
+ LOG.warn("No running container found for {}", runnableName);
+ return;
+ }
+
+ String lastContainerId = null;
+ TwillContainerController lastController = null;
+
+ // Find the controller with the maxInstanceId
+ for (Map.Entry<String, TwillContainerController> entry : containers.row(runnableName).entrySet()) {
+ if (getInstanceId(entry.getValue().getRunId()) == maxInstanceId) {
+ lastContainerId = entry.getKey();
+ lastController = entry.getValue();
+ break;
+ }
+ }
+
+ Preconditions.checkState(lastContainerId != null,
+ "No container found for {} with instanceId = {}", runnableName, maxInstanceId);
+
+ LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
+ lastController.stopAndWait();
+ containers.remove(runnableName, lastContainerId);
+ removeInstanceId(runnableName, maxInstanceId);
+ resourceReport.removeRunnableResources(runnableName, lastContainerId);
+ containerChange.signalAll();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Blocks until there are changes in running containers.
+ */
+ void waitForCount(String runnableName, int count) throws InterruptedException {
+ containerLock.lock();
+ try {
+ while (getRunningInstances(runnableName) != count) {
+ containerChange.await();
+ }
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of running instances of the given runnable.
+ */
+ int count(String runnableName) {
+ containerLock.lock();
+ try {
+ return getRunningInstances(runnableName);
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Returns a Map contains running instances of all runnables.
+ */
+ Map<String, Integer> countAll() {
+ containerLock.lock();
+ try {
+ return ImmutableMap.copyOf(Maps.transformValues(runnableInstances, BITSET_CARDINALITY));
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ void sendToAll(Message message, Runnable completion) {
+ containerLock.lock();
+ try {
+ if (containers.isEmpty()) {
+ completion.run();
+ }
+
+ // Sends the command to all running containers
+ AtomicInteger count = new AtomicInteger(containers.size());
+ for (Map.Entry<String, Map<String, TwillContainerController>> entry : containers.rowMap().entrySet()) {
+ for (TwillContainerController controller : entry.getValue().values()) {
+ sendMessage(entry.getKey(), message, controller, count, completion);
+ }
+ }
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ void sendToRunnable(String runnableName, Message message, Runnable completion) {
+ containerLock.lock();
+ try {
+ Collection<TwillContainerController> controllers = containers.row(runnableName).values();
+ if (controllers.isEmpty()) {
+ completion.run();
+ }
+
+ AtomicInteger count = new AtomicInteger(controllers.size());
+ for (TwillContainerController controller : controllers) {
+ sendMessage(runnableName, message, controller, count, completion);
+ }
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Stops all running services. Only called when the AppMaster stops.
+ */
+ void stopAll() {
+ containerLock.lock();
+ try {
+ // Stop it one by one in reverse order of start sequence
+ Iterator<String> itor = startSequence.descendingIterator();
+ List<ListenableFuture<ServiceController.State>> futures = Lists.newLinkedList();
+ while (itor.hasNext()) {
+ String runnableName = itor.next();
+ LOG.info("Stopping all instances of " + runnableName);
+
+ futures.clear();
+ // Parallel stops all running containers of the current runnable.
+ for (TwillContainerController controller : containers.row(runnableName).values()) {
+ futures.add(controller.stop());
+ }
+ // Wait for containers to stop. Assumes the future returned by Futures.successfulAsList won't throw exception.
+ Futures.getUnchecked(Futures.successfulAsList(futures));
+
+ LOG.info("Terminated all instances of " + runnableName);
+ }
+ containers.clear();
+ runnableInstances.clear();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ Set<String> getContainerIds() {
+ containerLock.lock();
+ try {
+ return ImmutableSet.copyOf(containers.columnKeySet());
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Handle completion of container.
+ * @param status The completion status.
+ * @param restartRunnables Set of runnable names that requires restart.
+ */
+ void handleCompleted(YarnContainerStatus status, Multiset<String> restartRunnables) {
+ containerLock.lock();
+ String containerId = status.getContainerId();
+ int exitStatus = status.getExitStatus();
+ ContainerState state = status.getState();
+
+ try {
+ Map<String, TwillContainerController> lookup = containers.column(containerId);
+ if (lookup.isEmpty()) {
+ // It's OK because if a container is stopped through removeLast, this would be empty.
+ return;
+ }
+
+ if (lookup.size() != 1) {
+ LOG.warn("More than one controller found for container {}", containerId);
+ }
+
+ if (exitStatus != 0) {
+ LOG.warn("Container {} exited abnormally with state {}, exit code {}. Re-request the container.",
+ containerId, state, exitStatus);
+ restartRunnables.add(lookup.keySet().iterator().next());
+ } else {
+ LOG.info("Container {} exited normally with state {}", containerId, state);
+ }
+
+ for (Map.Entry<String, TwillContainerController> completedEntry : lookup.entrySet()) {
+ String runnableName = completedEntry.getKey();
+ TwillContainerController controller = completedEntry.getValue();
+ controller.completed(exitStatus);
+
+ removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
+ resourceReport.removeRunnableResources(runnableName, containerId);
+ }
+
+ lookup.clear();
+ containerChange.signalAll();
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
+ /**
+ * Sends a command through the given {@link org.apache.twill.internal.TwillContainerController} of a runnable. Decrements the count
+ * when the sending of command completed. Triggers completion when count reaches zero.
+ */
+ private void sendMessage(final String runnableName, final Message message,
+ final TwillContainerController controller, final AtomicInteger count,
+ final Runnable completion) {
+ Futures.addCallback(controller.sendMessage(message), new FutureCallback<Message>() {
+ @Override
+ public void onSuccess(Message result) {
+ if (count.decrementAndGet() == 0) {
+ completion.run();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ try {
+ LOG.error("Failed to send message. Runnable: {}, RunId: {}, Message: {}.",
+ runnableName, controller.getRunId(), message, t);
+ } finally {
+ if (count.decrementAndGet() == 0) {
+ completion.run();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Returns the instanceId to start the given runnable.
+ */
+ private int getStartInstanceId(String runnableName) {
+ BitSet instances = runnableInstances.get(runnableName);
+ if (instances == null) {
+ instances = new BitSet();
+ runnableInstances.put(runnableName, instances);
+ }
+ int instanceId = instances.nextClearBit(0);
+ instances.set(instanceId);
+ return instanceId;
+ }
+
+ private void removeInstanceId(String runnableName, int instanceId) {
+ BitSet instances = runnableInstances.get(runnableName);
+ if (instances == null) {
+ return;
+ }
+ instances.clear(instanceId);
+ if (instances.isEmpty()) {
+ runnableInstances.remove(runnableName);
+ }
+ }
+
+ /**
+ * Returns the largest instanceId for the given runnable. Returns -1 if no container is running.
+ */
+ private int getMaxInstanceId(String runnableName) {
+ BitSet instances = runnableInstances.get(runnableName);
+ if (instances == null || instances.isEmpty()) {
+ return -1;
+ }
+ return instances.length() - 1;
+ }
+
+ /**
+ * Returns nnumber of running instances for the given runnable.
+ */
+ private int getRunningInstances(String runableName) {
+ BitSet instances = runnableInstances.get(runableName);
+ return instances == null ? 0 : instances.cardinality();
+ }
+
+ private RunId getRunId(String runnableName, int instanceId) {
+ RunId baseId;
+
+ Collection<TwillContainerController> controllers = containers.row(runnableName).values();
+ if (controllers.isEmpty()) {
+ baseId = RunIds.generate();
+ } else {
+ String id = controllers.iterator().next().getRunId().getId();
+ baseId = RunIds.fromString(id.substring(0, id.lastIndexOf('-')));
+ }
+
+ return RunIds.fromString(baseId.getId() + '-' + instanceId);
+ }
+
+ private int getInstanceId(RunId runId) {
+ String id = runId.getId();
+ return Integer.parseInt(id.substring(id.lastIndexOf('-') + 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
new file mode 100644
index 0000000..ca299e0
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/TrackerService.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.internal.json.ResourceReportAdapter;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.util.CharsetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Webservice that the Application Master will register back to the resource manager
+ * for clients to track application progress. Currently used purely for getting a
+ * breakdown of resource usage as a {@link org.apache.twill.api.ResourceReport}.
+ */
+public final class TrackerService extends AbstractIdleService {
+
+ // TODO: This is temporary. When support more REST API, this would get moved.
+ public static final String PATH = "/resources";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TrackerService.class);
+ private static final int NUM_BOSS_THREADS = 1;
+ private static final int CLOSE_CHANNEL_TIMEOUT = 5;
+ private static final int MAX_INPUT_SIZE = 100 * 1024 * 1024;
+
+ private final String host;
+ private ServerBootstrap bootstrap;
+ private InetSocketAddress bindAddress;
+ private URL url;
+ private final ChannelGroup channelGroup;
+ private final ResourceReport resourceReport;
+
+ /**
+ * Initialize the service.
+ *
+ * @param resourceReport live report that the service will return to clients.
+ * @param appMasterHost the application master host.
+ */
+ public TrackerService(ResourceReport resourceReport, String appMasterHost) {
+ this.channelGroup = new DefaultChannelGroup("appMasterTracker");
+ this.resourceReport = resourceReport;
+ this.host = appMasterHost;
+ }
+
+ /**
+ * Returns the address this tracker service is bounded to.
+ */
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ /**
+ * @return tracker url.
+ */
+ public URL getUrl() {
+ return url;
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ Executor bossThreads = Executors.newFixedThreadPool(NUM_BOSS_THREADS,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("boss-thread")
+ .build());
+
+ Executor workerThreads = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("worker-thread#%d")
+ .build());
+
+ ChannelFactory factory = new NioServerSocketChannelFactory(bossThreads, workerThreads);
+
+ bootstrap = new ServerBootstrap(factory);
+
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ public ChannelPipeline getPipeline() {
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpChunkAggregator(MAX_INPUT_SIZE));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("compressor", new HttpContentCompressor());
+ pipeline.addLast("handler", new ReportHandler(resourceReport));
+
+ return pipeline;
+ }
+ });
+
+ Channel channel = bootstrap.bind(new InetSocketAddress(host, 0));
+ bindAddress = (InetSocketAddress) channel.getLocalAddress();
+ url = URI.create(String.format("http://%s:%d", host, bindAddress.getPort()))
+ .resolve(TrackerService.PATH).toURL();
+ channelGroup.add(channel);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ try {
+ if (!channelGroup.close().await(CLOSE_CHANNEL_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.warn("Timeout when closing all channels.");
+ }
+ } finally {
+ bootstrap.releaseExternalResources();
+ }
+ }
+
+ /**
+ * Handler to return resources used by this application master, which will be available through
+ * the host and port set when this application master registered itself to the resource manager.
+ */
+ public class ReportHandler extends SimpleChannelUpstreamHandler {
+ private final ResourceReport report;
+ private final ResourceReportAdapter reportAdapter;
+
+ public ReportHandler(ResourceReport report) {
+ this.report = report;
+ this.reportAdapter = ResourceReportAdapter.create();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (!isValid(request)) {
+ write404(e);
+ return;
+ }
+
+ writeResponse(e);
+ }
+
+ // only accepts GET on /resources for now
+ private boolean isValid(HttpRequest request) {
+ return (request.getMethod() == HttpMethod.GET) && PATH.equals(request.getUri());
+ }
+
+ private void write404(MessageEvent e) {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
+ ChannelFuture future = e.getChannel().write(response);
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private void writeResponse(MessageEvent e) {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=UTF-8");
+
+ ChannelBuffer content = ChannelBuffers.dynamicBuffer();
+ Writer writer = new OutputStreamWriter(new ChannelBufferOutputStream(content), CharsetUtil.UTF_8);
+ reportAdapter.toJson(report, writer);
+ try {
+ writer.close();
+ } catch (IOException e1) {
+ LOG.error("error writing resource report", e1);
+ }
+ response.setContent(content);
+ ChannelFuture future = e.getChannel().write(response);
+ future.addListener(ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ e.getChannel().close();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1925ffaf/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
----------------------------------------------------------------------
diff --git a/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java b/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
new file mode 100644
index 0000000..bf8e677
--- /dev/null
+++ b/yarn/src/main/java/org/apache/twill/internal/appmaster/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains implementation of Twill application master.
+ */
+package org.apache.twill.internal.appmaster;