You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/03/04 17:13:47 UTC

[07/12] drill git commit: DRILL-1170: YARN integration for Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java
new file mode 100644
index 0000000..73a045f
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Abstract base class for schedulers that work with persistent
+ * (long-running) tasks. Such tasks are intended to run until
+ * explicitly shut down (unlike batch tasks that run until
+ * some expected completion.)
+ * <p>
+ * Provides a target quantity of tasks
+ * (see {@link #getTarget()}, along with operations to increase,
+ * decrease or set the target number.
+ * <p>
+ * The scheduler acts as a controller: starting new tasks as needed to
+ * match the desired target, or stopping tasks as needed when the
+ * target level is reduced.
+ */
+
+public abstract class PersistentTaskScheduler extends AbstractScheduler {
+  private static final Log LOG = LogFactory.getLog(PersistentTaskScheduler.class);
+  protected int quantity;
+
+  public PersistentTaskScheduler(String type, String name, int quantity) {
+    super(type, name);
+    this.quantity = quantity;
+  }
+
+  /**
+   * Set the number of running tasks to the quantity given.
+   *
+   * @param level
+   *          the target number of tasks
+   */
+
+  @Override
+  public int resize(int level) {
+    quantity = level;
+    if (quantity < 0) {
+      quantity = 0;
+    }
+    return quantity;
+  }
+
+  @Override
+  public int getTarget() { return quantity; }
+
+  /**
+   * Indicate that a task is completed. Normally occurs only
+   * when shutting down excess tasks.
+   *
+   * @param task
+   */
+
+
+  @Override
+  public void completed(Task task) { }
+
+  /**
+   * Progress for persistent tasks defaults to the ratio of
+   * running tasks to target level. Thus, a persistent cluster
+   * will normally report 100% progress.
+   *
+   * @return
+   */
+
+  @Override
+  public int[] getProgress() {
+    int activeCount = state.getTaskCount();
+    return new int[] { Math.min(activeCount, quantity), quantity };
+  }
+
+  /**
+   * Adjust the number of running tasks to better match the target
+   * by starting or stopping tasks as needed.
+   */
+
+  @Override
+  public void adjust() {
+    int activeCount = state.getTaskCount();
+    int delta = quantity - activeCount;
+    if (delta > 0) {
+      addTasks(delta);
+    } else if (delta < 0) {
+      cancelTasks(activeCount);
+    }
+  }
+
+  /**
+   * Cancel the requested number of tasks. We exclude any tasks that are already
+   * in the process of being cancelled. Because we ignore those tasks, it might
+   * be that we want to reduce the task count, but there is nothing left to cancel.
+   *
+   * @param cancelCount
+   */
+
+  private void cancelTasks(int cancelCount) {
+    int cancelled = state.getCancelledTaskCount();
+    int cancellable = cancelCount - cancelled;
+    int n = cancellable - quantity;
+    LOG.info("[" + getName( ) + "] - Cancelling " + cancelCount +
+             " tasks. " + cancelled + " are already cancelled, " +
+             cancellable + " more will be cancelled.");
+    if (n <= 0) {
+      return;
+    }
+    for (Task task : state.getStartingTasks()) {
+      state.cancel(task);
+      if (--n == 0) {
+        return;
+      }
+    }
+    for (Task task : state.getActiveTasks()) {
+      state.cancel(task);
+      if (--n == 0) {
+        return;
+      }
+    }
+
+    // If we get here it means something has gotten out of whack.
+
+    LOG.error("Tried to cancel " + cancellable + " tasks, but " + n + " could not be cancelled.");
+    assert false;
+  }
+
+  /**
+   * The persistent scheduler has no fixed sequence of tasks to run, it launches
+   * a set and is never "done". For purposes of completion tracking claim we
+   * have no further tasks.
+   *
+   * @return false
+   */
+
+  @Override
+  public boolean hasMoreTasks() { return false; }
+
+  @Override
+  public void requestTimedOut() {
+
+    // We requested a node a while back, requested a container from YARN,
+    // but waited too long to receive it. Most likely cause is that we
+    // want a container on a node that either does not exist, or is too
+    // heavily loaded. (That is, we have a 3-node cluster and are requesting
+    // a 4th node. Or, we have 2 nodes but node 3 has insufficient resources.)
+    // In either case, we're not likely to ever get the container, so just
+    // reduce the target size to what we an get.
+
+    assert quantity > 0;
+    if (quantity == 0) {
+      LOG.error("Container timed out, but target quantity is already 0!");
+    } else {
+      quantity--;
+      LOG.info("Container request timed out. Reducing target container count by 1 to " + quantity);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java
new file mode 100644
index 0000000..7e1c9a3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Interface for objects that are polled on each
+ * controller clock tick in order to perform
+ * time-based tasks.
+ */
+
+public interface Pollable {
+  public void tick(long curTime);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java
new file mode 100644
index 0000000..81d5a5d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Clock driver that calls a callback once each pulse period. Used to react to
+ * time-based events such as timeouts, checking for changed files, etc.
+ * This is called a "pulse" because it is periodic, like your pulse. But,
+ * unlike the "heartbeat" between the AM and YARN or the AM and ZK,
+ * this is purely internal.
+ */
+
+public class PulseRunnable implements Runnable {
+  private static final Log LOG = LogFactory.getLog(PulseRunnable.class);
+
+  /**
+   * Interface implemented to receive calls on each clock "tick."
+   */
+
+  public interface PulseCallback {
+    void onTick(long curTime);
+  }
+
+  private final int pulsePeriod;
+  private final PulseRunnable.PulseCallback callback;
+  public AtomicBoolean isLive = new AtomicBoolean(true);
+
+  public PulseRunnable(int pulsePeriodMS,
+      PulseRunnable.PulseCallback callback) {
+    pulsePeriod = pulsePeriodMS;
+    this.callback = callback;
+  }
+
+  @Override
+  public void run() {
+    while (isLive.get()) {
+      try {
+        Thread.sleep(pulsePeriod);
+      } catch (InterruptedException e) {
+        break;
+      }
+      try {
+        callback.onTick(System.currentTimeMillis());
+      } catch (Exception e) {
+
+        // Ignore exceptions. Seems strange, but is required to allow
+        // graceful shutdown of the AM when errors occur. For example, we
+        // start tasks on tick events. If those tasks fail, the timer
+        // goes down. But, the timer is also needed to time out failed
+        // requests in order to bring down the AM. So, just log the error
+        // and soldier on.
+
+        LOG.error("Timer thread caught, ignored an exception", e);
+      }
+    }
+  }
+
+  public void stop() { isLive.set(false); }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java
new file mode 100644
index 0000000..ff29bdf
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Callback from the ZooKeeper registry to announce events
+ * related to Drillbit registration.
+ */
+
+public interface RegistryHandler {
+  void reserveHost(String hostName);
+
+  void releaseHost(String hostName);
+
+  void startAck(Task task, String propertyKey, Object value);
+
+  void completionAck(Task task, String endpointProperty);
+
+  void registryDown();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java
new file mode 100644
index 0000000..7f8be0c
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.LaunchSpec;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The scheduler describes the set of tasks to run. It provides the details
+ * required to launch each task and optionally a specification of the containers
+ * required to run the task.
+ * <p>
+ * Schedulers can manage batch task (which do their job and complete), or
+ * persistent tasks (which run until terminated.)
+ * <p>
+ * The scheduler tracks task completion (for batch tasks) and task levels (for
+ * persistent tasks.)
+ */
+
+public interface Scheduler {
+  public interface TaskManager {
+    int maxConcurrentAllocs();
+
+    LaunchSpec getLaunchSpec(Task task);
+
+    void allocated(EventContext context);
+
+    boolean stop(Task task);
+
+    void completed(EventContext context);
+
+    boolean isLive(EventContext context);
+  }
+
+  /**
+   * Controller-assigned priority for this scheduler. Used to differentiate
+   * container requests by scheduler.
+   *
+   * @param priority
+   */
+
+  void setPriority(int priority);
+
+  /**
+   * Register the state object that tracks tasks launched by this scheduler.
+   *
+   * @param state
+   */
+
+  void registerState(SchedulerState state);
+
+  String getName();
+
+  String getType();
+
+  /**
+   * Whether tasks from this scheduler should incorporate app startup/shutdown
+   * acknowledgements (acks) into the task lifecycle.
+   *
+   * @return
+   */
+
+  boolean isTracked();
+
+  TaskManager getTaskManager();
+
+  /**
+   * Get the desired number of running tasks.
+   *
+   * @return
+   */
+  int getTarget();
+
+  /**
+   * Increase (positive) or decrease (negative) the number of desired tasks by
+   * the given amount.
+   *
+   * @param delta
+   */
+  void change(int delta);
+
+  /**
+   * Set the number of desired tasks to the given level.
+   *
+   * @param level
+   * @return the actual resize level, which may be lower than the requested
+   * level if the system cannot provide the requested level
+   */
+
+  int resize(int level);
+
+  void completed(Task task);
+
+  /**
+   * Adjust the number of running tasks to better track the desired number.
+   * Starts or stops tasks using the {@link SchedulerState} registered with
+   * {@link #registerState(SchedulerState)}.
+   */
+
+  void adjust();
+
+  /**
+   * Return an estimate of progress given as a ratio of (work completed, total
+   * work).
+   *
+   * @return
+   */
+  int[] getProgress();
+
+  /**
+   * If this is a batch scheduler, whether all tasks for the batch have
+   * completed. If this is a persistent task scheduler, always returns false.
+   *
+   * @return true if the scheduler has more tasks to run, false if the
+   * scheduler has no more tasks or manages a set of long-running tasks
+   */
+  boolean hasMoreTasks();
+
+  /**
+   * For reporting, get the YARN resources requested by processes in
+   * this pool.
+   * @return
+   */
+
+  ContainerRequestSpec getResource( );
+
+  void limitContainerSize(Resource maxResource) throws AMException;
+
+  /**
+   * Maximum amount of time to wait when cancelling a job in the REQUESTING
+   * state. YARN will happily wait forever for a resource, this setting
+   * forcibly cancels the request at timeout.
+   *
+   * @return the number of seconds to wait for timeout. 0 means no timeout
+   */
+
+  int getRequestTimeoutSec();
+
+  /**
+   * Informs the scheduler that a YARN resource request timed out. The scheduler
+   * can either retry or (more productively) assume that the requested node is
+   * not available and adjust its target size downward.
+   */
+
+  void requestTimedOut();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java
new file mode 100644
index 0000000..7a1f8bd
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.List;
+
+/**
+ * The cluster state for tasks managed by a scheduler. Abstracts away the
+ * details of managing tasks, allowing the scheduler to work only with overall
+ * number of tasks.
+ */
+
+public interface SchedulerState {
+  /**
+   * The number of tasks in any active (non-ended) lifecycle state.
+   *
+   * @return
+   */
+
+  int getTaskCount();
+
+  /**
+   * The number of active tasks that have been cancelled, but have not yet
+   * ended.
+   *
+   * @return
+   */
+
+  int getCancelledTaskCount();
+
+  /**
+   * Returns the list of tasks awaiting a container request to be sent to YARN
+   * or for which a container request has been sent to YARN, but no container
+   * allocation has yet been received. Such tasks are simple to cancel. The list
+   * does not contain any tasks in this state which have previously been
+   * cancelled.
+   *
+   * @return
+   */
+
+  List<Task> getStartingTasks();
+
+  /**
+   * Returns the list of active tasks that have not yet been cancelled. Active
+   * tasks are any task for which a container has been assigned, but has not yet
+   * received a RM container completion event.
+   *
+   * @return
+   */
+
+  List<Task> getActiveTasks();
+
+  /**
+   * Start the given task.
+   *
+   * @param task
+   */
+
+  void start(Task task);
+
+  void cancel(Task task);
+
+  ClusterController getController();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java
new file mode 100644
index 0000000..65e8f2a
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+/**
+ * Represents the set of commands called by the cluster controller to manage the
+ * state of tasks within a task group. Each task group is managed by a
+ * scheduler.
+ */
+
+public interface SchedulerStateActions {
+  /**
+   * Returns the name of the scheduler associated with this task action group.
+   *
+   * @return
+   */
+
+  String getName();
+
+  /**
+   * Returns the scheduler associated with this task group.
+   *
+   * @return
+   */
+
+  Scheduler getScheduler();
+
+  /**
+   * Adjust the number of running tasks as needed to balance the number of
+   * running tasks with the desired number. May result in no change it the
+   * cluster is already in balance (or is in the process of achieving balance.)
+   */
+
+  void adjustTasks();
+
+  /**
+   * Request a container the first task that we wish to start.
+   */
+
+  boolean requestContainers(EventContext context, int maxRequests);
+
+  /**
+   * A container request has been granted. Match the container up with the first
+   * task waiting for a container and launch the task.
+   *
+   * @param context
+   * @param container
+   */
+
+  void containerAllocated(EventContext context, Container container);
+
+  /**
+   * Shut down this task group by canceling all tasks not already cancelled.
+   *
+   * @param context
+   */
+
+  void shutDown(EventContext context);
+
+  /**
+   * Determine if this task group is done. It is done when there are no active
+   * tasks and the controller itself is shutting down. This latter check
+   * differentiates the start state (when no tasks are active) from the end
+   * state. The AM will not shut down until all task groups are done.
+   *
+   * @return
+   */
+
+  boolean isDone();
+
+  int getTaskCount( );
+
+  int getLiveCount();
+
+  int getRequestCount( );
+
+  void visitTaskModels( TaskVisitor visitor );
+
+  void checkTasks(EventContext context, long curTime);
+
+  void cancel(Task task);
+
+  Task getTask(int id);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java
new file mode 100644
index 0000000..4c85cf3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * Manages a the set of tasks associated with a scheduler. The scheduler decides
+ * which tasks to run or stop; the task group manages the life-cycle of the
+ * tasks for the given scheduler.
+ * <p>
+ * Schedulers, and hence their groups, define a priority. When starting, higher
+ * priority (lower priority value) groups run before lower priority groups.
+ * Similarly, when shrinking the cluster, lower priority groups shrink before
+ * higher priority groups.
+ */
+
+public final class SchedulerStateImpl
+    implements SchedulerState, SchedulerStateActions {
+  static final Log LOG = LogFactory.getLog(SchedulerStateImpl.class);
+
+  private final Scheduler scheduler;
+
+  private final ClusterControllerImpl controller;
+
+  /**
+   * Tracks the tasks to be started, but for which no work has yet been done.
+   * (State == PENDING).
+   */
+
+  protected List<Task> pendingTasks = new LinkedList<>();
+
+  /**
+   * Tracks the tasks for which containers have been requested. (State ==
+   * REQUESTED).
+   */
+
+  protected List<Task> allocatingTasks = new LinkedList<>();
+
+  /**
+   * Tracks running tasks: those that have been allocated containers and are
+   * starting, running, failed or ended. We use a map for this because, during
+   * these states, the task is identified by its container. (State == LAUNCHING,
+   * RUNNING or ENDING).
+   */
+
+  protected Map<ContainerId, Task> activeContainers = new HashMap<>();
+
+  public SchedulerStateImpl(ClusterControllerImpl controller,
+      Scheduler scheduler) {
+    this.controller = controller;
+    this.scheduler = scheduler;
+    scheduler.registerState(this);
+  }
+
+  @Override
+  public String getName() {
+    return scheduler.getName();
+  }
+
+  public int getMaxRetries() {
+    return controller.getMaxRetries();
+  }
+
+  public int getStopTimeoutMs() {
+    return controller.getStopTimeoutMs();
+  }
+
+  @Override
+  public Scheduler getScheduler() { return scheduler; }
+
+  /**
+   * Define a new task in this group. Adds it to the pending queue so that a
+   * container will be requested.
+   *
+   * @param task
+   */
+
+  @Override
+  public void start(Task task) {
+    assert task.getGroup() == null;
+    task.setGroup(this);
+    enqueuePendingRequest(task);
+  }
+
+  /**
+   * Put a task into the queue waiting to send a container request to YARN.
+   *
+   * @param task
+   */
+
+  public void enqueuePendingRequest(Task task) {
+    assert !activeContainers.containsValue(task);
+    assert !allocatingTasks.contains(task);
+    assert !pendingTasks.contains(task);
+    pendingTasks.add(task);
+
+    // Special initial-state notification
+
+    EventContext context = new EventContext(controller, task);
+    controller.fireLifecycleChange(TaskLifecycleListener.Event.CREATED,
+        context);
+  }
+
+  public int maxCurrentRequests() {
+    return this.scheduler.getTaskManager().maxConcurrentAllocs();
+  }
+
+  @Override
+  public boolean requestContainers(EventContext context, int maxRequests) {
+    if (pendingTasks.isEmpty()) {
+      return false;
+    }
+
+    // Limit the maximum number of requests to the limit set by
+    // the scheduler.
+
+    maxRequests = Math.min(maxRequests, maxCurrentRequests());
+
+    // Further limit requests to account for in-flight requests.
+
+    maxRequests -= allocatingTasks.size( );
+
+    // Request containers as long as there are pending tasks remaining.
+
+    for (int i = 0; i < maxRequests && !pendingTasks.isEmpty(); i++) {
+      context.setTask(pendingTasks.get(0));
+      context.getState().requestContainer(context);
+    }
+    return true;
+  }
+
+  /**
+   * Remove a task from the queue of tasks waiting to send a container request.
+   * The caller must put the task into the proper next state: the allocating
+   * queue or the completed task list.
+   *
+   * @param task
+   */
+
+  public void dequeuePendingRequest(Task task) {
+    assert !activeContainers.containsValue(task);
+    assert !allocatingTasks.contains(task);
+    assert pendingTasks.contains(task);
+    pendingTasks.remove(task);
+  }
+
+  /**
+   * Put a task onto the queue awaiting an allocation response from YARN.
+   *
+   * @param task
+   */
+
+  public void enqueueAllocatingTask(Task task) {
+    assert !activeContainers.containsValue(task);
+    assert !allocatingTasks.contains(task);
+    assert !pendingTasks.contains(task);
+    allocatingTasks.add(task);
+  }
+
+  @Override
+  public void containerAllocated(EventContext context, Container container) {
+    if (activeContainers.containsKey(container.getId())) {
+      LOG.error("Container allocated again: " + DoYUtil.labelContainer(container));
+      return;
+    }
+    if (allocatingTasks.isEmpty()) {
+
+      // Not sure why this happens. Maybe only in debug mode
+      // due stopping execution one thread while the RM
+      // heartbeat keeps sending our request over & over?
+      // One known case: the user requests a container. While YARN is
+      // considering the request, the user cancels the task.
+
+      LOG.warn("Releasing unwanted container: " + DoYUtil.labelContainer(container) );
+      context.yarn.releaseContainer(container);
+      return;
+    }
+    context.setTask(allocatingTasks.get(0));
+    context.getState().containerAllocated(context, container);
+  }
+
+  @Override
+  public void checkTasks(EventContext context, long curTime) {
+
+    // Iterate over tasks using a temporary list. The tick event may cause a timeout
+    // that turns around and modifies these lists.
+
+    List<Task> temp = new ArrayList<>( );
+    temp.addAll( allocatingTasks );
+    for (Task task : temp) {
+      context.setTask(task);
+      context.getState().tick(context, curTime);
+    }
+    temp.clear();
+    temp.addAll( pendingTasks );
+    for (Task task : temp) {
+      context.setTask(task);
+      context.getState().tick(context, curTime);
+    }
+    temp.clear();
+    temp.addAll( activeContainers.values( ) );
+    for (Task task : temp) {
+      context.setTask(task);
+      context.getState().tick(context, curTime);
+    }
+  }
+
+  /**
+   * Remove a task from the list of those waiting for a container allocation.
+   * The allocation may be done, or cancelled. The caller is responsible for
+   * moving the task to the next collection.
+   *
+   * @param task
+   */
+
+  public void dequeueAllocatingTask(Task task) {
+    assert allocatingTasks.contains(task);
+    allocatingTasks.remove(task);
+  }
+
+  /**
+   * Mark that a task has become active and should be tracked by its container
+   * ID. Prior to this, the task is not associated with a container.
+   *
+   * @param task
+   */
+
+  public void containerAllocated(Task task) {
+    assert !activeContainers.containsValue(task);
+    assert !allocatingTasks.contains(task);
+    assert !pendingTasks.contains(task);
+    activeContainers.put(task.getContainerId(), task);
+    controller.containerAllocated(task);
+  }
+
+  /**
+   * Mark that a task has completed: its container has expired or been revoked
+   * or the task has completed: successfully or a failure, as given by the
+   * task's disposition. The task can no longer be tracked by its container ID.
+   * If this is the last active task for this group, mark the group itself as
+   * completed.
+   *
+   * @param task
+   */
+
+  public void containerReleased(Task task) {
+    assert activeContainers.containsKey(task.getContainerId());
+    activeContainers.remove(task.getContainerId());
+    controller.containerReleased(task);
+  }
+
+  /**
+   * Mark that a task has completed successfully or a failure, as given by the
+   * task's disposition. If this is the last active task for this group, mark
+   * the group itself as completed.
+   *
+   * @param task
+   */
+
+  public void taskEnded(Task task) {
+    scheduler.completed(task);
+    controller.taskEnded(task);
+    if (isDone()) {
+      controller.taskGroupCompleted(this);
+    }
+    LOG.info(task.toString() + " - Task completed" );
+  }
+
+  /**
+   * Mark that a task is about to be retried. Task still retains its state from
+   * the current try.
+   *
+   * @param task
+   */
+
+  public void taskRetried(Task task) {
+    controller.taskRetried(task);
+  }
+
+  @Override
+  public void shutDown(EventContext context) {
+    for (Task task : getStartingTasks()) {
+      context.setTask(task);
+      context.getState().cancel(context);
+    }
+    for (Task task : getActiveTasks()) {
+      context.setTask(task);
+      context.getState().cancel(context);
+    }
+  }
+
+  /**
+   * Report if this task group has any tasks in the active part of their
+   * life-cycle: pending, allocating or active.
+   *
+   * @return
+   */
+
+  public boolean hasTasks() {
+    return getTaskCount() != 0;
+  }
+
+  @Override
+  public boolean isDone() {
+    return !hasTasks() && !scheduler.hasMoreTasks();
+  }
+
+  @Override
+  public void adjustTasks() {
+    scheduler.adjust();
+  }
+
+  /**
+   * Request a graceful stop of the task. Delegates to the task manager to do
+   * the actual work.
+   *
+   * @return true if the graceful stop request was sent, false if not, or if
+   *         this task type has no graceful stop
+   */
+
+  public boolean requestStop(Task task) {
+    return scheduler.getTaskManager().stop(task);
+  }
+
+  @Override
+  public int getTaskCount() {
+    return pendingTasks.size() + allocatingTasks.size()
+        + activeContainers.size();
+  }
+
+  @Override
+  public int getCancelledTaskCount() {
+
+    // TODO Crude first cut. This value should be maintained
+    // as a count.
+
+    int count = 0;
+    for (Task task : pendingTasks) {
+      if (task.isCancelled()) {
+        count++;
+      }
+    }
+    for (Task task : allocatingTasks) {
+      if (task.isCancelled()) {
+        count++;
+      }
+    }
+    for (Task task : activeContainers.values()) {
+      if (task.isCancelled()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public List<Task> getStartingTasks() {
+    List<Task> tasks = new ArrayList<>();
+    for (Task task : pendingTasks) {
+      if (!task.isCancelled()) {
+        tasks.add(task);
+      }
+    }
+    for (Task task : allocatingTasks) {
+      if (!task.isCancelled()) {
+        tasks.add(task);
+      }
+    }
+    return tasks;
+  }
+
+  @Override
+  public List<Task> getActiveTasks() {
+    List<Task> tasks = new ArrayList<>();
+    for (Task task : activeContainers.values()) {
+      if (!task.isCancelled()) {
+        tasks.add(task);
+      }
+    }
+    return tasks;
+  }
+
+  @Override
+  public void cancel(Task task) {
+    EventContext context = new EventContext(controller, task);
+    LOG.info( task.getLabel() + " Task cancelled" );
+    context.getState().cancel(context);
+  }
+
+  @Override
+  public int getLiveCount() {
+    int count = 0;
+    for (Task task : activeContainers.values()) {
+      if (task.isLive()) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  @Override
+  public void visitTaskModels(TaskVisitor visitor) {
+    for (Task task : pendingTasks) {
+      visitor.visit(task);
+    }
+    for (Task task : allocatingTasks) {
+      visitor.visit(task);
+    }
+    for (Task task : activeContainers.values()) {
+      visitor.visit(task);
+    }
+  }
+
+  @Override
+  public Task getTask(int id) {
+    for (Task task : pendingTasks) {
+      if (task.getId() == id) {
+        return task;
+      }
+    }
+    for (Task task : allocatingTasks) {
+      if (task.getId() == id) {
+        return task;
+      }
+    }
+    for (Task task : activeContainers.values()) {
+      if (task.getId() == id) {
+        return task;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public int getRequestCount() {
+    return allocatingTasks.size();
+  }
+
+  @Override
+  public ClusterController getController( ) { return controller; }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java
new file mode 100644
index 0000000..147f5f7
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.LaunchSpec;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+
+/**
+ * AM-side state of individual containers. This class is mostly
+ * a holder of state. Behavior is provided by the
+ * {@link TaskState} subclasses.
+ */
+
+public class Task {
+  /**
+   * Tracking plugin state. A task can be untracked, or moves
+   * though states<br>
+   * NEW --> START_ACK --> END_ACK
+   * <p>
+   * Tracking state is separate from, but integrated with,
+   * task state. This is because, due to latency, tracking
+   * events may be slightly out of sync with YARN events.
+   */
+
+  public enum TrackingState
+  {
+    UNTRACKED( "N/A" ),
+    NEW( "Waiting" ),
+    START_ACK( "OK" ),
+    END_ACK( "Deregistered" );
+
+    private String displayName;
+
+    private TrackingState( String displayName ) {
+      this.displayName = displayName;
+    }
+
+    public String getDisplayName( ) { return displayName; }
+  }
+
+  public enum Disposition
+  {
+    CANCELLED, LAUNCH_FAILED, RUN_FAILED, COMPLETED, TOO_MANY_RETRIES, RETRIED
+  }
+
+  /**
+   * Maximum amount of time to wait when canceling a job in the REQUESTING
+   * state. YARN will happily wait forever for a resource, this setting allows
+   * the user to request to cancel a task, give YARN a while to respond, then
+   * forcibly cancel the job at timeout.
+   */
+
+  public static final long MAX_CANCELLATION_TIME = 10_000; // ms = 10s
+
+  /**
+   * Tasks receive a sequential internal task ID. Since all task
+   * creation is single-threaded, no additional concurrency controls
+   * are needed to protect this value.
+   */
+
+  private static volatile int taskCounter = 0;
+
+  /**
+   * Internal identifier for the task.
+   */
+
+  public final int taskId;
+
+
+  public final Scheduler scheduler;
+
+  /**
+   * Identifies the type of container needed and the details of the task to run.
+   */
+
+  public TaskSpec taskSpec;
+
+  /**
+   * The scheduler group that manages this task.
+   */
+
+  public SchedulerStateImpl taskGroup;
+
+  /**
+   * Tracking state for an additional task tracker (such as using
+   * ZooKeeper to track Drill-bits.)
+   */
+
+  protected TrackingState trackingState;
+
+  /**
+   * Tracks the container request between request and allocation. We must pass
+   * the container request back to YARN to remove it once it is allocated.
+   */
+
+  public ContainerRequest containerRequest;
+
+  /**
+   * The YARN container assigned to this task. The container is set only during
+   * the ALLOCATED, LAUNCHING, RUNNING and ENDING states.
+   */
+
+  public Container container;
+
+  /**
+   * Life-cycle state of this task.
+   */
+
+  protected TaskState state;
+
+  /**
+   * True if the application has requested that the resource request or
+   * application run be cancelled. Cancelled tasks are not subject to retry.
+   */
+
+  protected boolean cancelled;
+
+  /**
+   * Disposition of a completed task: whether it was cancelled, succeeded or
+   * failed.
+   */
+
+  public Disposition disposition;
+
+  public Throwable error;
+
+  public int tryCount;
+
+  public ContainerStatus completionStatus;
+
+  public long launchTime;
+  public long stateStartTime;
+  public long completionTime;
+
+  long cancellationTime;
+
+  public Map<String,Object> properties = new HashMap<>( );
+
+  public Task(Scheduler scheduler, TaskSpec taskSpec) {
+    taskId = ++taskCounter;
+    this.scheduler = scheduler;
+    this.taskSpec = taskSpec;
+    state = TaskState.START;
+    resetTrackingState();
+  }
+
+  /**
+   * Special constructor to create a static copy of the current
+   * task. The copy is placed in the completed tasks list.
+   * @param task
+   */
+
+  private Task(Task task) {
+    taskId = task.taskId;
+    scheduler = task.scheduler;
+    taskSpec = task.taskSpec;
+    taskGroup = task.taskGroup;
+    trackingState = task.trackingState;
+    containerRequest = task.containerRequest;
+    container = task.container;
+    state = task.state;
+    cancelled = task.cancelled;
+    disposition = task.disposition;
+    error = task.error;
+    tryCount = task.tryCount;
+    completionStatus = task.completionStatus;
+    launchTime = task.launchTime;
+    stateStartTime = task.stateStartTime;
+    completionTime = task.completionTime;
+    cancellationTime = task.cancellationTime;
+    properties.putAll( task.properties );
+  }
+
+  public void resetTrackingState( ) {
+    trackingState = scheduler.isTracked() ? TrackingState.NEW : TrackingState.UNTRACKED;
+  }
+
+  public int getId( ) { return taskId; }
+  public ContainerRequestSpec getContainerSpec() { return taskSpec.containerSpec; }
+
+  public LaunchSpec getLaunchSpec() { return taskSpec.launchSpec; }
+
+  public TaskState getState() { return state; }
+
+  public ContainerId getContainerId() {
+    assert container != null;
+    return container.getId();
+  }
+
+  public Container getContainer() {
+    assert container != null;
+    return container;
+  }
+
+  public int getTryCount() { return tryCount; }
+
+  public boolean isFailed() {
+    return disposition != null && disposition != Disposition.COMPLETED;
+  }
+
+  public Disposition getDisposition() { return disposition; }
+
+  public SchedulerStateImpl getGroup() { return taskGroup; }
+
+  public void setGroup(SchedulerStateImpl taskGroup) { this.taskGroup = taskGroup; }
+
+  public boolean retryable() {
+    return !cancelled && disposition != Disposition.COMPLETED;
+  }
+
+  public boolean isCancelled() { return cancelled; }
+
+  /**
+   * Reset the task state in preparation for a retry.
+   * Note: state reset is done by the state class.
+   */
+
+  public void reset() {
+    assert !cancelled;
+    error = null;
+    disposition = null;
+    completionStatus = null;
+    launchTime = 0;
+    completionTime = 0;
+    cancellationTime = 0;
+    container = null;
+    resetTrackingState();
+  }
+
+  public long uptime() {
+    long endTime = completionTime;
+    if (endTime == 0) {
+      endTime = System.currentTimeMillis();
+    }
+    return endTime - launchTime;
+  }
+
+  public String getHostName() {
+    if (container == null) {
+      return null;
+    }
+    return container.getNodeId().getHost();
+  }
+
+  public TrackingState getTrackingState() {
+    return trackingState;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("[id=")
+       .append(taskId)
+       .append(", type=");
+    // Scheduler is unset in some unit tests.
+    if (scheduler !=null ) {
+       buf.append(scheduler.getName());
+    }
+    buf.append(", name=")
+       .append(getName());
+    if (container != null) {
+      buf.append(", host=")
+         .append(getHostName());
+    }
+    buf.append(", state=")
+       .append(state.toString())
+       .append("]");
+    return buf.toString();
+  }
+
+  public boolean isLive() {
+    return state == TaskState.RUNNING && !cancelled;
+  }
+
+  public void cancel() {
+    cancelled = true;
+    cancellationTime = System.currentTimeMillis();
+  }
+
+  public Task copy() {
+    return new Task(this);
+  }
+
+  public String getName() {
+    return taskSpec == null ? null : taskSpec.name;
+  }
+
+  /**
+   * Label for this task displayed in log messages.
+   *
+   * @return
+   */
+
+  public String getLabel() {
+    return toString( );
+  }
+
+  public void setTrackingState(TrackingState tState) {
+    trackingState = tState;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java
new file mode 100644
index 0000000..218cd9b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface TaskLifecycleListener {
+  public enum Event {
+    CREATED, ALLOCATED, RUNNING, ENDED
+  }
+
+  void stateChange(Event event, EventContext context);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java
new file mode 100644
index 0000000..4399a86
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.LaunchSpec;
+
+public class TaskSpec {
+  /**
+   * Number of YARN vcores (virtual cores) and amount of memory (in MB) needed
+   * by this task.
+   */
+
+  public ContainerRequestSpec containerSpec;
+
+  /**
+   * Description of of the task process, environment and so on.
+   */
+
+  public LaunchSpec launchSpec;
+
+  public int maxRetries;
+
+  public String name;
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java
new file mode 100644
index 0000000..3d52105
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java
@@ -0,0 +1,895 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.Task.Disposition;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ * Represents the behaviors associated with each state in the lifecycle
+ * of a task.
+ * <p>
+ * Startup process:
+ * <dl>
+ * <dt>START --> REQUESTING<dt>
+ * <dd>New task sends a container request to YARN.</dd>
+ * <dt>REQUESTING --> LAUNCHING<dt>
+ * <dd>Container received from YARN, launching the tasks's process.</dd>
+ * <dt>LAUNCHING --> RUNNING<dt>
+ * <dd>Task launched and needs no start Ack.</dd>
+ * <dt>LAUNCHING --> WAIT_START_ACK<dt>
+ * <dd>Task launched and needs a start Ack.</dd>
+ * <dt>WAIT_START_ACK --> RUNNING<dt>
+ * <dd>Start Ack received.</dd>
+ * </dl>
+ * <p>
+ * Shutdown process:
+ * <dt>RUNNING --> WAIT_END_ACK | END<dt>
+ * <dd>The resource manager reported task completion.</dd>
+ * <dt>RUNNING --> ENDING<dt>
+ * <dd>Request sent to the task for a graceful shutdown.</dd>
+ * <dt>RUNNING --> KILLING<dt>
+ * <dd>Request sent to the node manager to forcibly kill the task.</dd>
+ * <dt>ENDING --> WAIT_END_ACK | END<dt>
+ * <dd>The task gracefully exited as reported by the resource manager.</dd>
+ * <dt>ENDING --> KILLING<dt>
+ * <dd>The wait for graceful exit timed out, a forced kill message
+ *     sent to the node manager.</dd>
+ * <dt>KILLING --> WAIT_END_ACK | END<dt>
+ * <dd>The task exited as reported by the resource manager.</dd>
+ * <dt>END_ACK --> END<dt>
+ * <dd>The end-ack is received or the wait timed out.</dd>
+ * <dl>
+ * <p>
+ * This is a do-it-yourself enum. Java enums values are instances of a single
+ * class. In this version, each enum value is the sole instance of a separate
+ * class, allowing each state to have its own behavior.
+ */
+
+public abstract class TaskState {
+  /**
+   * Task that is newly created and needs a container allocated. No messages
+   * have yet been sent to YARN for the task.
+   */
+
+  private static class StartState extends TaskState {
+    protected StartState() { super(false, TaskLifecycleListener.Event.CREATED, true); }
+
+    @Override
+    public void requestContainer(EventContext context) {
+      Task task = context.task;
+      task.tryCount++;
+      context.group.dequeuePendingRequest(task);
+      if (task.cancelled) {
+        taskStartFailed(context, Disposition.CANCELLED);
+      } else {
+        transition(context, REQUESTING);
+        context.group.enqueueAllocatingTask(task);
+        task.containerRequest = context.yarn
+            .requestContainer(task.getContainerSpec());
+      }
+    }
+
+    /**
+     * Cancellation is trivial: just drop the task; no need to coordinate
+     * with YARN.
+     */
+
+    @Override
+    public void cancel(EventContext context) {
+      Task task = context.task;
+      assert !task.cancelled;
+      context.group.dequeuePendingRequest(task);
+      task.cancel();
+      taskStartFailed(context, Disposition.CANCELLED);
+    }
+  }
+
+  /**
+   * Task for which a container request has been sent but not yet received.
+   */
+
+  private static class RequestingState extends TaskState {
+    protected RequestingState() {
+      super(false, TaskLifecycleListener.Event.CREATED, true);
+    }
+
+    /**
+     * Handle REQUESING --> LAUNCHING. Indicates that we've asked YARN to start
+     * the task on the allocated container.
+     */
+
+    @Override
+    public void containerAllocated(EventContext context, Container container) {
+      Task task = context.task;
+      LOG.info(task.getLabel() + " - Received container: "
+          + DoYUtil.describeContainer(container));
+      context.group.dequeueAllocatingTask(task);
+
+      // No matter what happens below, we don't want to ask for this
+      // container again. The RM async API is a bit bizarre in this
+      // regard: it will keep asking for container over and over until
+      // we tell it to stop.
+
+      context.yarn.removeContainerRequest(task.containerRequest);
+
+      // The container is need both in the normal and in the cancellation
+      // path, so set it here.
+
+      task.container = container;
+      if (task.cancelled) {
+        context.yarn.releaseContainer(container);
+        taskStartFailed(context, Disposition.CANCELLED);
+        return;
+      }
+      task.error = null;
+      task.completionStatus = null;
+      transition(context, LAUNCHING);
+
+      // The pool that manages this task wants to know that we have
+      // a container. The task manager may want to do some task-
+      // specific setup.
+
+      context.group.containerAllocated(context.task);
+      context.getTaskManager().allocated(context);
+
+      // Go ahead and launch a task in the container using the launch
+      // specification provided by the task group (pool).
+
+      try {
+        context.yarn.launchContainer(container, task.getLaunchSpec());
+        task.launchTime = System.currentTimeMillis();
+      } catch (YarnFacadeException e) {
+        LOG.error("Container launch failed: " + task.getContainerId(), e);
+
+        // This may not be the right response. RM may still think
+        // we have the container if the above is a local failure.
+
+        task.error = e;
+        context.group.containerReleased(task);
+        task.container = null;
+        taskStartFailed(context, Disposition.LAUNCH_FAILED);
+      }
+    }
+
+    /**
+     * Cancel the container request. We must wait for the response from YARN to
+     * do the actual cancellation. For now, just mark the task as cancelled.
+     */
+
+    @Override
+    public void cancel(EventContext context) {
+      Task task = context.task;
+      context.task.cancel();
+      LOG.info(task.getLabel() + " - Cancelled at user request");
+      context.yarn.removeContainerRequest(task.containerRequest);
+      context.group.dequeueAllocatingTask(task);
+      task.disposition = Task.Disposition.CANCELLED;
+      task.completionTime = System.currentTimeMillis();
+      transition(context, END);
+      context.group.taskEnded(context.task);
+    }
+
+    /**
+     * The task is requesting a container. If the request takes too long,
+     * cancel the request and shrink the target task count. This event
+     * generally indicates that the user wants to run more tasks than
+     * the cluster has capacity.
+     */
+
+    @Override
+    public void tick(EventContext context, long curTime) {
+      Task task = context.task;
+      int timeoutSec = task.scheduler.getRequestTimeoutSec( );
+      if (timeoutSec == 0) {
+        return;
+      }
+      if (task.stateStartTime + timeoutSec * 1000 > curTime) {
+        return;
+      }
+      LOG.info(task.getLabel() + " - Request timed out after + "
+          + timeoutSec + " secs.");
+      context.yarn.removeContainerRequest(task.containerRequest);
+      context.group.dequeueAllocatingTask(task);
+      task.disposition = Task.Disposition.LAUNCH_FAILED;
+      task.completionTime = System.currentTimeMillis();
+      transition(context, END);
+      context.group.taskEnded(context.task);
+      task.scheduler.requestTimedOut();
+    }
+  }
+
+  /**
+   * Task for which a container has been allocated and the task launch request
+   * sent. Awaiting confirmation that the task is running.
+   */
+
+  private static class LaunchingState extends TaskState {
+    protected LaunchingState() {
+      super(true, TaskLifecycleListener.Event.ALLOCATED, true);
+    }
+
+    /**
+     * Handle launch failure. Results in a LAUNCHING --> END transition or
+     * restart.
+     * <p>
+     * This situation can occur, when debugging, if a timeout occurs after the
+     * allocation message, such as when, sitting in the debugger on the
+     * allocation event.
+     */
+
+    @Override
+    public void launchFailed(EventContext context, Throwable t) {
+      Task task = context.task;
+      LOG.info(task.getLabel() + " - Container start failed");
+      context.task.error = t;
+      launchFailed(context);
+    }
+
+    /**
+     * Handle LAUNCHING --> RUNNING/START_ACK. Indicates that YARN has confirmed
+     * that the task is, indeed, running.
+     */
+
+    @Override
+    public void containerStarted(EventContext context) {
+      Task task = context.task;
+
+      // If this task is tracked (that is, it is a Drillbit which
+      // we monitor using ZK) then we have to decide if we've
+      // seen the task in the tracker yet. If we have, then the
+      // task is fully running. If we haven't, then we need to
+      // wait for the start acknowledgement.
+
+      if (task.trackingState == Task.TrackingState.NEW) {
+        transition(context, WAIT_START_ACK);
+      } else {
+        transition(context, RUNNING);
+      }
+      task.error = null;
+
+      // If someone came along and marked the task as cancelled,
+      // we are now done waiting for YARN so we can immediately
+      // turn around and kill the task. (Can't kill the task,
+      // however, until YARN starts it, hence the need to wait
+      // for YARN to start the task before killing it.)
+
+      if (task.cancelled) {
+        transition(context, KILLING);
+        context.yarn.killContainer(task.getContainer());
+      }
+    }
+
+    /**
+     * Out-of-order start ACK, perhaps due to network latency. Handle by staying
+     * in this state, but later jump directly<br>
+     * LAUNCHING --> RUNNING
+     */
+
+    @Override
+    public void startAck(EventContext context) {
+      context.task.trackingState = Task.TrackingState.START_ACK;
+    }
+
+    @Override
+    public void containerCompleted(EventContext context,
+        ContainerStatus status) {
+      // Seen on Mac when putting machine to sleep.
+      // Handle by failing & retrying.
+      completed(context, status);
+      endOrAck(context);
+    }
+
+    @Override
+    public void cancel(EventContext context) {
+      context.task.cancel();
+      context.yarn.killContainer(context.task.getContainer());
+    }
+
+    @Override
+    public void tick(EventContext context, long curTime) {
+
+      // If we are canceling the task, and YARN has not reported container
+      // completion after some amount of time, just force failure.
+
+      Task task = context.task;
+      if (task.isCancelled()
+          && task.cancellationTime + Task.MAX_CANCELLATION_TIME < curTime) {
+        LOG.error(task.getLabel() + " - Launch timed out after "
+            + Task.MAX_CANCELLATION_TIME / 1000 + " secs.");
+        launchFailed(context);
+      }
+    }
+
+    private void launchFailed(EventContext context) {
+      Task task = context.task;
+      task.completionTime = System.currentTimeMillis();
+
+      // Not sure if releasing the container is needed...
+
+      context.yarn.releaseContainer(task.container);
+      context.group.containerReleased(task);
+      task.container = null;
+      taskStartFailed(context, Disposition.LAUNCH_FAILED);
+    }
+  }
+
+  /**
+   * Task has been launched, is tracked, but we've not yet received a start ack.
+   */
+
+  private static class WaitStartAckState extends TaskState {
+    protected WaitStartAckState() {
+      super(true, TaskLifecycleListener.Event.RUNNING, true);
+    }
+
+    @Override
+    public void startAck(EventContext context) {
+      context.task.trackingState = Task.TrackingState.START_ACK;
+      transition(context, RUNNING);
+    }
+
+    @Override
+    public void cancel(EventContext context) {
+      RUNNING.cancel(context);
+    }
+
+    // @Override
+    // public void containerStopped(EventContext context) {
+    // transition(context, WAIT_COMPLETE );
+    // }
+
+    @Override
+    public void containerCompleted(EventContext context,
+        ContainerStatus status) {
+      completed(context, status);
+      taskTerminated(context);
+    }
+
+    // TODO: Timeout in this state.
+  }
+
+  /**
+   * Task in the normal running state.
+   */
+
+  private static class RunningState extends TaskState {
+    protected RunningState() {
+      super(true, TaskLifecycleListener.Event.RUNNING, true);
+    }
+
+    /**
+     * Normal task completion. Implements the RUNNING --> END transition.
+     *
+     * @param status
+     */
+
+    @Override
+    public void containerCompleted(EventContext context,
+        ContainerStatus status) {
+      completed(context, status);
+      endOrAck(context);
+    }
+
+    @Override
+    public void cancel(EventContext context) {
+      Task task = context.task;
+      task.cancel();
+      if (context.group.requestStop(task)) {
+        transition(context, ENDING);
+      } else {
+        context.yarn.killContainer(task.container);
+        transition(context, KILLING);
+      }
+    }
+
+    /**
+     * The task claims that it is complete, but we think it is running. Assume
+     * that the task has started its own graceful shutdown (or the
+     * equivalent).<br>
+     * RUNNING --> ENDING
+     */
+
+    @Override
+    public void completionAck(EventContext context) {
+      context.task.trackingState = Task.TrackingState.END_ACK;
+      transition(context, ENDING);
+    }
+  }
+
+  /**
+   * Task for which a termination request has been sent to the Drill-bit, but
+   * confirmation has not yet been received from the Node Manager. (Not yet
+   * supported in the Drill-bit.
+   */
+
+  public static class EndingState extends TaskState {
+    protected EndingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); }
+
+    /*
+     * Normal ENDING --> WAIT_COMPLETE transition, awaiting Resource Manager
+     * confirmation.
+     */
+
+//    @Override
+//    public void containerStopped(EventContext context) {
+//      transition(context, WAIT_COMPLETE);
+//    }
+
+    /**
+     * Normal ENDING --> WAIT_END_ACK | END transition.
+     *
+     * @param status
+     */
+
+    @Override
+    public void containerCompleted(EventContext context,
+        ContainerStatus status) {
+      completed(context, status);
+      endOrAck(context);
+    }
+
+    @Override
+    public void cancel(EventContext context) {
+      context.task.cancel();
+    }
+
+    /**
+     * If the graceful stop process exceeds the maximum timeout, go ahead and
+     * forcibly kill the process.
+     */
+
+    @Override
+    public void tick(EventContext context, long curTime) {
+      Task task = context.task;
+      if (curTime - task.stateStartTime > task.taskGroup.getStopTimeoutMs()) {
+        context.yarn.killContainer(task.container);
+        transition(context, KILLING);
+      }
+    }
+
+    @Override
+    public void completionAck(EventContext context) {
+      context.task.trackingState = Task.TrackingState.END_ACK;
+    }
+  }
+
+  /**
+   * Task for which a forced termination request has been sent to the Node
+   * Manager, but a stop message has not yet been received.
+   */
+
+  public static class KillingState extends TaskState {
+    protected KillingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); }
+
+    /*
+     * Normal KILLING --> WAIT_COMPLETE transition, awaiting Resource Manager
+     * confirmation.
+     */
+
+//    @Override
+//    public void containerStopped(EventContext context) {
+//      transition(context, WAIT_COMPLETE);
+//    }
+
+    /**
+     * Normal KILLING --> WAIT_END_ACK | END transition.
+     *
+     * @param status
+     */
+
+    @Override
+    public void containerCompleted(EventContext context,
+        ContainerStatus status) {
+      completed(context, status);
+      endOrAck(context);
+    }
+
+    @Override
+    public void cancel(EventContext context) {
+      context.task.cancel();
+    }
+
+    @Override
+    public void startAck(EventContext context) {
+      // Better late than never... Happens during debugging sessions
+      // when order of messages is scrambled.
+
+      context.task.trackingState = Task.TrackingState.START_ACK;
+    }
+
+    @Override
+    public void completionAck(EventContext context) {
+      context.task.trackingState = Task.TrackingState.END_ACK;
+    }
+
+    @Override
+    public void stopTaskFailed(EventContext context, Throwable t) {
+      assert false;
+      // What to do?
+    }
+  }
+
+  /**
+   * Task exited, but we are waiting for confirmation from Zookeeper that
+   * the Drillbit registration has been removed. Required to associate
+   * ZK registrations with Drillbits. Ensures that we don't try to
+   * start a new Drillbit on a node until the previous Drillbit
+   * completely shut down, including dropping out of ZK.
+   */
+
+  private static class WaitEndAckState extends TaskState {
+    protected WaitEndAckState() {
+      super(false, TaskLifecycleListener.Event.RUNNING, false);
+    }
+
+    @Override
+    public void cancel(EventContext context) {
+      context.task.cancel();
+    }
+
+    @Override
+    public void completionAck(EventContext context) {
+      context.task.trackingState = Task.TrackingState.END_ACK;
+      taskTerminated(context);
+    }
+
+    /**
+     * Periodically check if the process is still live. We are supposed to
+     * receive events when the task becomes deregistered. But, we've seen
+     * cases where the task hangs in this state forever. Try to resolve
+     * the issue by polling periodically.
+     */
+
+    @Override
+    public void tick(EventContext context, long curTime) {
+      if(! context.getTaskManager().isLive(context)){
+        taskTerminated(context);
+      }
+    }
+  }
+
+  /**
+   * Task is completed or failed. The disposition field gives the details of the
+   * completion type. The task is not active on YARN, but could be retried.
+   */
+
+  private static class EndState extends TaskState {
+    protected EndState() {
+      super(false, TaskLifecycleListener.Event.ENDED, false);
+    }
+
+    /*
+     * Ignore out-of-order Node Manager completion notices.
+     */
+
+    // @Override
+    // public void containerStopped(EventContext context) {
+    // }
+
+    @Override
+    public void cancel(EventContext context) {
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(TaskState.class);
+
+  public static final TaskState START = new StartState();
+  public static final TaskState REQUESTING = new RequestingState();
+  public static final TaskState LAUNCHING = new LaunchingState();
+  public static final TaskState WAIT_START_ACK = new WaitStartAckState();
+  public static final TaskState RUNNING = new RunningState();
+  public static final TaskState ENDING = new EndingState();
+  public static final TaskState KILLING = new KillingState();
+  public static final TaskState WAIT_END_ACK = new WaitEndAckState();
+  public static final TaskState END = new EndState();
+
+  protected final boolean hasContainer;
+  protected final TaskLifecycleListener.Event lifeCycleEvent;
+  protected final String label;
+  protected final boolean cancellable;
+
+  public TaskState(boolean hasContainer, TaskLifecycleListener.Event lcEvent,
+      boolean cancellable) {
+    this.hasContainer = hasContainer;
+    lifeCycleEvent = lcEvent;
+    this.cancellable = cancellable;
+    String name = toString();
+    name = name.replace("State", "");
+    name = name.replaceAll("([a-z]+)([A-Z])", "$1_$2");
+    label = name.toUpperCase();
+  }
+
+  protected void endOrAck(EventContext context) {
+    if (context.task.trackingState == Task.TrackingState.START_ACK) {
+      transition(context, WAIT_END_ACK);
+    } else {
+      taskTerminated(context);
+    }
+  }
+
+  public void requestContainer(EventContext context) {
+    illegalState(context, "requestContainer");
+  }
+
+  /**
+   * Resource Manager reports that the task has been allocated a container.
+   *
+   * @param context
+   * @param container
+   */
+
+  public void containerAllocated(EventContext context, Container container) {
+    illegalState(context, "containerAllocated");
+  }
+
+  /**
+   * The launch of the container failed.
+   *
+   * @param context
+   * @param t
+   */
+
+  public void launchFailed(EventContext context, Throwable t) {
+    illegalState(context, "launchFailed");
+  }
+
+  /**
+   * Node Manager reports that the task has started execution.
+   *
+   * @param context
+   */
+
+  public void containerStarted(EventContext context) {
+    illegalState(context, "containerStarted");
+  }
+
+  /**
+   * The monitoring plugin has detected that the task has confirmed that it is
+   * fully started.
+   */
+
+  public void startAck(EventContext context) {
+    illegalState(context, "startAck");
+  }
+
+  /**
+   * The node manager request to stop a task failed.
+   *
+   * @param context
+   * @param t
+   */
+
+  public void stopTaskFailed(EventContext context, Throwable t) {
+    illegalState(context, "stopTaskFailed");
+  }
+
+  /**
+   * The monitoring plugin has detected that the task has confirmed that it has
+   * started shutdown.
+   */
+
+  public void completionAck(EventContext context) {
+    illegalState(context, "completionAck");
+  }
+
+  /**
+   * Node Manager reports that the task has stopped execution. We don't yet know
+   * if this was a success or failure.
+   *
+   * @param context
+   */
+
+  public void containerStopped(EventContext context) {
+    illegalState(context, "containerStopped");
+  }
+
+  /**
+   * Resource Manager reports that the task has completed execution and provided
+   * the completion status.
+   *
+   * @param context
+   * @param status
+   */
+
+  public void containerCompleted(EventContext context, ContainerStatus status) {
+    completed(context, status);
+    illegalState(context, "containerCompleted");
+  }
+
+  /**
+   * Cluster manager wishes to cancel this task.
+   *
+   * @param context
+   */
+
+  public void cancel(EventContext context) {
+    illegalState(context, "cancel");
+  }
+
+  public void tick(EventContext context, long curTime) {
+    // Ignore by default
+  }
+
+  /**
+   * Implement a state transition, alerting any life cycle listeners and
+   * updating the log file. Marks the start time of the new state in support of
+   * states that implement a timeout.
+   *
+   * @param context
+   * @param newState
+   */
+
+  protected void transition(EventContext context, TaskState newState) {
+    TaskState oldState = context.task.state;
+    LOG.info(context.task.getLabel() + " " + oldState.toString() + " --> "
+        + newState.toString());
+    context.task.state = newState;
+    if (newState.lifeCycleEvent != oldState.lifeCycleEvent) {
+      context.controller.fireLifecycleChange(newState.lifeCycleEvent, context);
+    }
+    context.task.stateStartTime = System.currentTimeMillis();
+  }
+
+  /**
+   * Task failed when starting. No container has been allocated. The task
+   * will go from:<br>
+   * * --> END
+   * <p>
+   * If the run failed, and the task can be retried, it may
+   * then move from<br>
+   * END --> STARTING
+   * @param context
+   * @param disposition
+   */
+
+  protected void taskStartFailed(EventContext context,
+      Disposition disposition) {
+
+    // No container, so don't alert the task manager.
+
+    assert context.task.container == null;
+
+    context.getTaskManager().completed(context);
+    taskEnded(context, disposition);
+    retryTask(context);
+  }
+
+  /**
+   * A running task terminated. It may have succeeded or failed,
+   * this method will determine which.
+   * <p>
+   * Every task goes from:<br>
+   * * --> END
+   * <p>
+   * If the run failed, and the task can be retried, it may
+   * then move from<br>
+   * END --> STARTING
+   *
+   * @param context
+   */
+
+  protected void taskTerminated(EventContext context) {
+    Task task = context.task;
+
+    // Give the task manager a peek at the completed task.
+    // The task manager can override retry behavior. To
+    // cancel a task that would otherwise be retried, call
+    // cancel( ) on the task.
+
+    context.getTaskManager().completed(context);
+    context.group.containerReleased(task);
+    assert task.completionStatus != null;
+    if (task.completionStatus.getExitStatus() == 0) {
+      taskEnded(context, Disposition.COMPLETED);
+      context.group.taskEnded(context.task);
+    } else {
+      taskEnded(context, Disposition.RUN_FAILED);
+      retryTask(context);
+    }
+  }
+
+  /**
+   * Implements the details of marking a task as ended. Note, this method
+   * does not deregister the task with the scheduler state, we keep it
+   * registered in case we decide to retry.
+   *
+   * @param context
+   * @param disposition
+   */
+
+  private void taskEnded(EventContext context, Disposition disposition) {
+    Task task = context.task;
+    if (disposition == null) {
+      assert task.disposition != null;
+    } else {
+      task.disposition = disposition;
+    }
+    task.completionTime = System.currentTimeMillis();
+    transition(context, END);
+  }
+
+  /**
+   * Retry a task. Requires that the task currently be in the END state to provide
+   * clean state transitions. Will deregister the task if it cannot be retried
+   * because the cluster is ending or the task has failed too many times.
+   * Otherwise, starts the whole life cycle over again.
+   *
+   * @param context
+   */
+
+  private void retryTask(EventContext context) {
+    Task task = context.task;
+    assert task.state == END;
+    if (!context.controller.isLive() || !task.retryable()) {
+      context.group.taskEnded(task);
+      return;
+    }
+    if (task.tryCount > task.taskGroup.getMaxRetries()) {
+      LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount);
+      task.disposition = Disposition.TOO_MANY_RETRIES;
+      context.group.taskEnded(task);
+      return;
+    }
+    LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount);
+    context.group.taskRetried(task);
+    task.reset();
+    transition(context, START);
+    context.group.enqueuePendingRequest(task);
+  }
+
+  /**
+   * An event is called in a state where it is not expected. Log it, ignore it
+   * and hope it goes away.
+   *
+   * @param action
+   */
+
+  private void illegalState(EventContext context, String action) {
+    // Intentionally assert: fails during debugging, soldiers on in production.
+
+    assert false;
+    LOG.error(context.task.getLabel() + " - Action " + action
+        + " in wrong state: " + toString(),
+        new IllegalStateException("Action in wrong state"));
+  }
+
+  protected void completed(EventContext context, ContainerStatus status) {
+    Task task = context.task;
+    String diag = status.getDiagnostics();
+    LOG.trace(
+        task.getLabel() + " Completed, exit status: " + status.getExitStatus()
+            + (DoYUtil.isBlank(diag) ? "" : ": " + status.getDiagnostics()));
+    task.completionStatus = status;
+  }
+
+  @Override
+  public String toString() { return getClass().getSimpleName(); }
+
+  public boolean hasContainer() { return hasContainer; }
+
+  public String getLabel() { return label; }
+
+  public boolean isCancellable() {
+    return cancellable;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java
new file mode 100644
index 0000000..c90d4f8
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface TaskVisitor {
+  void visit(Task task);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java
new file mode 100644
index 0000000..8ac0a5d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Exceptions thrown from the YARN facade: the wrapper around the YARN AM
+ * interfaces.
+ */
+
+@SuppressWarnings("serial")
+public class YarnFacadeException extends Exception {
+  public YarnFacadeException(String msg, Exception e) {
+    super(msg, e);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java
new file mode 100644
index 0000000..fbca171
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster.http;
+
+/**
+ * Security manager for the Application Master. Allows a variety
+ * of security systems, including Drill's user authentication
+ * and DoY's static user/password, or an open AM web UI.
+ */
+
+public interface AMSecurityManager {
+  void init();
+
+  boolean requiresLogin();
+
+  boolean login(String user, String password);
+
+  void close();
+}