You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/03/04 17:13:47 UTC
[07/12] drill git commit: DRILL-1170: YARN integration for Drill
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java
new file mode 100644
index 0000000..73a045f
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Abstract base class for schedulers that work with persistent
+ * (long-running) tasks. Such tasks are intended to run until
+ * explicitly shut down (unlike batch tasks that run until
+ * some expected completion.)
+ * <p>
+ * Provides a target quantity of tasks
+ * (see {@link #getTarget()}, along with operations to increase,
+ * decrease or set the target number.
+ * <p>
+ * The scheduler acts as a controller: starting new tasks as needed to
+ * match the desired target, or stopping tasks as needed when the
+ * target level is reduced.
+ */
+
+public abstract class PersistentTaskScheduler extends AbstractScheduler {
+ private static final Log LOG = LogFactory.getLog(PersistentTaskScheduler.class);
+ protected int quantity;
+
+ public PersistentTaskScheduler(String type, String name, int quantity) {
+ super(type, name);
+ this.quantity = quantity;
+ }
+
+ /**
+ * Set the number of running tasks to the quantity given.
+ *
+ * @param level
+ * the target number of tasks
+ */
+
+ @Override
+ public int resize(int level) {
+ quantity = level;
+ if (quantity < 0) {
+ quantity = 0;
+ }
+ return quantity;
+ }
+
+ @Override
+ public int getTarget() { return quantity; }
+
+ /**
+ * Indicate that a task is completed. Normally occurs only
+ * when shutting down excess tasks.
+ *
+ * @param task
+ */
+
+
+ @Override
+ public void completed(Task task) { }
+
+ /**
+ * Progress for persistent tasks defaults to the ratio of
+ * running tasks to target level. Thus, a persistent cluster
+ * will normally report 100% progress.
+ *
+ * @return
+ */
+
+ @Override
+ public int[] getProgress() {
+ int activeCount = state.getTaskCount();
+ return new int[] { Math.min(activeCount, quantity), quantity };
+ }
+
+ /**
+ * Adjust the number of running tasks to better match the target
+ * by starting or stopping tasks as needed.
+ */
+
+ @Override
+ public void adjust() {
+ int activeCount = state.getTaskCount();
+ int delta = quantity - activeCount;
+ if (delta > 0) {
+ addTasks(delta);
+ } else if (delta < 0) {
+ cancelTasks(activeCount);
+ }
+ }
+
+ /**
+ * Cancel the requested number of tasks. We exclude any tasks that are already
+ * in the process of being cancelled. Because we ignore those tasks, it might
+ * be that we want to reduce the task count, but there is nothing left to cancel.
+ *
+ * @param cancelCount
+ */
+
+ private void cancelTasks(int cancelCount) {
+ int cancelled = state.getCancelledTaskCount();
+ int cancellable = cancelCount - cancelled;
+ int n = cancellable - quantity;
+ LOG.info("[" + getName( ) + "] - Cancelling " + cancelCount +
+ " tasks. " + cancelled + " are already cancelled, " +
+ cancellable + " more will be cancelled.");
+ if (n <= 0) {
+ return;
+ }
+ for (Task task : state.getStartingTasks()) {
+ state.cancel(task);
+ if (--n == 0) {
+ return;
+ }
+ }
+ for (Task task : state.getActiveTasks()) {
+ state.cancel(task);
+ if (--n == 0) {
+ return;
+ }
+ }
+
+ // If we get here it means something has gotten out of whack.
+
+ LOG.error("Tried to cancel " + cancellable + " tasks, but " + n + " could not be cancelled.");
+ assert false;
+ }
+
+ /**
+ * The persistent scheduler has no fixed sequence of tasks to run, it launches
+ * a set and is never "done". For purposes of completion tracking claim we
+ * have no further tasks.
+ *
+ * @return false
+ */
+
+ @Override
+ public boolean hasMoreTasks() { return false; }
+
+ @Override
+ public void requestTimedOut() {
+
+ // We requested a node a while back, requested a container from YARN,
+ // but waited too long to receive it. Most likely cause is that we
+ // want a container on a node that either does not exist, or is too
+ // heavily loaded. (That is, we have a 3-node cluster and are requesting
+ // a 4th node. Or, we have 2 nodes but node 3 has insufficient resources.)
+ // In either case, we're not likely to ever get the container, so just
+ // reduce the target size to what we an get.
+
+ assert quantity > 0;
+ if (quantity == 0) {
+ LOG.error("Container timed out, but target quantity is already 0!");
+ } else {
+ quantity--;
+ LOG.info("Container request timed out. Reducing target container count by 1 to " + quantity);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java
new file mode 100644
index 0000000..7e1c9a3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Interface for objects that are polled on each
+ * controller clock tick in order to perform
+ * time-based tasks.
+ */
+
+public interface Pollable {
+ public void tick(long curTime);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java
new file mode 100644
index 0000000..81d5a5d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Clock driver that calls a callback once each pulse period. Used to react to
+ * time-based events such as timeouts, checking for changed files, etc.
+ * This is called a "pulse" because it is periodic, like your pulse. But,
+ * unlike the "heartbeat" between the AM and YARN or the AM and ZK,
+ * this is purely internal.
+ */
+
+public class PulseRunnable implements Runnable {
+ private static final Log LOG = LogFactory.getLog(PulseRunnable.class);
+
+ /**
+ * Interface implemented to receive calls on each clock "tick."
+ */
+
+ public interface PulseCallback {
+ void onTick(long curTime);
+ }
+
+ private final int pulsePeriod;
+ private final PulseRunnable.PulseCallback callback;
+ public AtomicBoolean isLive = new AtomicBoolean(true);
+
+ public PulseRunnable(int pulsePeriodMS,
+ PulseRunnable.PulseCallback callback) {
+ pulsePeriod = pulsePeriodMS;
+ this.callback = callback;
+ }
+
+ @Override
+ public void run() {
+ while (isLive.get()) {
+ try {
+ Thread.sleep(pulsePeriod);
+ } catch (InterruptedException e) {
+ break;
+ }
+ try {
+ callback.onTick(System.currentTimeMillis());
+ } catch (Exception e) {
+
+ // Ignore exceptions. Seems strange, but is required to allow
+ // graceful shutdown of the AM when errors occur. For example, we
+ // start tasks on tick events. If those tasks fail, the timer
+ // goes down. But, the timer is also needed to time out failed
+ // requests in order to bring down the AM. So, just log the error
+ // and soldier on.
+
+ LOG.error("Timer thread caught, ignored an exception", e);
+ }
+ }
+ }
+
+ public void stop() { isLive.set(false); }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java
new file mode 100644
index 0000000..ff29bdf
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Callback from the ZooKeeper registry to announce events
+ * related to Drillbit registration.
+ */
+
+public interface RegistryHandler {
+ void reserveHost(String hostName);
+
+ void releaseHost(String hostName);
+
+ void startAck(Task task, String propertyKey, Object value);
+
+ void completionAck(Task task, String endpointProperty);
+
+ void registryDown();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java
new file mode 100644
index 0000000..7f8be0c
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.LaunchSpec;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The scheduler describes the set of tasks to run. It provides the details
+ * required to launch each task and optionally a specification of the containers
+ * required to run the task.
+ * <p>
+ * Schedulers can manage batch task (which do their job and complete), or
+ * persistent tasks (which run until terminated.)
+ * <p>
+ * The scheduler tracks task completion (for batch tasks) and task levels (for
+ * persistent tasks.)
+ */
+
+public interface Scheduler {
+ public interface TaskManager {
+ int maxConcurrentAllocs();
+
+ LaunchSpec getLaunchSpec(Task task);
+
+ void allocated(EventContext context);
+
+ boolean stop(Task task);
+
+ void completed(EventContext context);
+
+ boolean isLive(EventContext context);
+ }
+
+ /**
+ * Controller-assigned priority for this scheduler. Used to differentiate
+ * container requests by scheduler.
+ *
+ * @param priority
+ */
+
+ void setPriority(int priority);
+
+ /**
+ * Register the state object that tracks tasks launched by this scheduler.
+ *
+ * @param state
+ */
+
+ void registerState(SchedulerState state);
+
+ String getName();
+
+ String getType();
+
+ /**
+ * Whether tasks from this scheduler should incorporate app startup/shutdown
+ * acknowledgements (acks) into the task lifecycle.
+ *
+ * @return
+ */
+
+ boolean isTracked();
+
+ TaskManager getTaskManager();
+
+ /**
+ * Get the desired number of running tasks.
+ *
+ * @return
+ */
+ int getTarget();
+
+ /**
+ * Increase (positive) or decrease (negative) the number of desired tasks by
+ * the given amount.
+ *
+ * @param delta
+ */
+ void change(int delta);
+
+ /**
+ * Set the number of desired tasks to the given level.
+ *
+ * @param level
+ * @return the actual resize level, which may be lower than the requested
+ * level if the system cannot provide the requested level
+ */
+
+ int resize(int level);
+
+ void completed(Task task);
+
+ /**
+ * Adjust the number of running tasks to better track the desired number.
+ * Starts or stops tasks using the {@link SchedulerState} registered with
+ * {@link #registerState(SchedulerState)}.
+ */
+
+ void adjust();
+
+ /**
+ * Return an estimate of progress given as a ratio of (work completed, total
+ * work).
+ *
+ * @return
+ */
+ int[] getProgress();
+
+ /**
+ * If this is a batch scheduler, whether all tasks for the batch have
+ * completed. If this is a persistent task scheduler, always returns false.
+ *
+ * @return true if the scheduler has more tasks to run, false if the
+ * scheduler has no more tasks or manages a set of long-running tasks
+ */
+ boolean hasMoreTasks();
+
+ /**
+ * For reporting, get the YARN resources requested by processes in
+ * this pool.
+ * @return
+ */
+
+ ContainerRequestSpec getResource( );
+
+ void limitContainerSize(Resource maxResource) throws AMException;
+
+ /**
+ * Maximum amount of time to wait when cancelling a job in the REQUESTING
+ * state. YARN will happily wait forever for a resource, this setting
+ * forcibly cancels the request at timeout.
+ *
+ * @return the number of seconds to wait for timeout. 0 means no timeout
+ */
+
+ int getRequestTimeoutSec();
+
+ /**
+ * Informs the scheduler that a YARN resource request timed out. The scheduler
+ * can either retry or (more productively) assume that the requested node is
+ * not available and adjust its target size downward.
+ */
+
+ void requestTimedOut();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java
new file mode 100644
index 0000000..7a1f8bd
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.List;
+
+/**
+ * The cluster state for tasks managed by a scheduler. Abstracts away the
+ * details of managing tasks, allowing the scheduler to work only with overall
+ * number of tasks.
+ */
+
+public interface SchedulerState {
+ /**
+ * The number of tasks in any active (non-ended) lifecycle state.
+ *
+ * @return
+ */
+
+ int getTaskCount();
+
+ /**
+ * The number of active tasks that have been cancelled, but have not yet
+ * ended.
+ *
+ * @return
+ */
+
+ int getCancelledTaskCount();
+
+ /**
+ * Returns the list of tasks awaiting a container request to be sent to YARN
+ * or for which a container request has been sent to YARN, but no container
+ * allocation has yet been received. Such tasks are simple to cancel. The list
+ * does not contain any tasks in this state which have previously been
+ * cancelled.
+ *
+ * @return
+ */
+
+ List<Task> getStartingTasks();
+
+ /**
+ * Returns the list of active tasks that have not yet been cancelled. Active
+ * tasks are any task for which a container has been assigned, but has not yet
+ * received a RM container completion event.
+ *
+ * @return
+ */
+
+ List<Task> getActiveTasks();
+
+ /**
+ * Start the given task.
+ *
+ * @param task
+ */
+
+ void start(Task task);
+
+ void cancel(Task task);
+
+ ClusterController getController();
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java
new file mode 100644
index 0000000..65e8f2a
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+/**
+ * Represents the set of commands called by the cluster controller to manage the
+ * state of tasks within a task group. Each task group is managed by a
+ * scheduler.
+ */
+
+public interface SchedulerStateActions {
+ /**
+ * Returns the name of the scheduler associated with this task action group.
+ *
+ * @return
+ */
+
+ String getName();
+
+ /**
+ * Returns the scheduler associated with this task group.
+ *
+ * @return
+ */
+
+ Scheduler getScheduler();
+
+ /**
+ * Adjust the number of running tasks as needed to balance the number of
+ * running tasks with the desired number. May result in no change it the
+ * cluster is already in balance (or is in the process of achieving balance.)
+ */
+
+ void adjustTasks();
+
+ /**
+ * Request a container the first task that we wish to start.
+ */
+
+ boolean requestContainers(EventContext context, int maxRequests);
+
+ /**
+ * A container request has been granted. Match the container up with the first
+ * task waiting for a container and launch the task.
+ *
+ * @param context
+ * @param container
+ */
+
+ void containerAllocated(EventContext context, Container container);
+
+ /**
+ * Shut down this task group by canceling all tasks not already cancelled.
+ *
+ * @param context
+ */
+
+ void shutDown(EventContext context);
+
+ /**
+ * Determine if this task group is done. It is done when there are no active
+ * tasks and the controller itself is shutting down. This latter check
+ * differentiates the start state (when no tasks are active) from the end
+ * state. The AM will not shut down until all task groups are done.
+ *
+ * @return
+ */
+
+ boolean isDone();
+
+ int getTaskCount( );
+
+ int getLiveCount();
+
+ int getRequestCount( );
+
+ void visitTaskModels( TaskVisitor visitor );
+
+ void checkTasks(EventContext context, long curTime);
+
+ void cancel(Task task);
+
+ Task getTask(int id);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java
new file mode 100644
index 0000000..4c85cf3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * Manages a the set of tasks associated with a scheduler. The scheduler decides
+ * which tasks to run or stop; the task group manages the life-cycle of the
+ * tasks for the given scheduler.
+ * <p>
+ * Schedulers, and hence their groups, define a priority. When starting, higher
+ * priority (lower priority value) groups run before lower priority groups.
+ * Similarly, when shrinking the cluster, lower priority groups shrink before
+ * higher priority groups.
+ */
+
+public final class SchedulerStateImpl
+ implements SchedulerState, SchedulerStateActions {
+ static final Log LOG = LogFactory.getLog(SchedulerStateImpl.class);
+
+ private final Scheduler scheduler;
+
+ private final ClusterControllerImpl controller;
+
+ /**
+ * Tracks the tasks to be started, but for which no work has yet been done.
+ * (State == PENDING).
+ */
+
+ protected List<Task> pendingTasks = new LinkedList<>();
+
+ /**
+ * Tracks the tasks for which containers have been requested. (State ==
+ * REQUESTED).
+ */
+
+ protected List<Task> allocatingTasks = new LinkedList<>();
+
+ /**
+ * Tracks running tasks: those that have been allocated containers and are
+ * starting, running, failed or ended. We use a map for this because, during
+ * these states, the task is identified by its container. (State == LAUNCHING,
+ * RUNNING or ENDING).
+ */
+
+ protected Map<ContainerId, Task> activeContainers = new HashMap<>();
+
+ public SchedulerStateImpl(ClusterControllerImpl controller,
+ Scheduler scheduler) {
+ this.controller = controller;
+ this.scheduler = scheduler;
+ scheduler.registerState(this);
+ }
+
+ @Override
+ public String getName() {
+ return scheduler.getName();
+ }
+
+ public int getMaxRetries() {
+ return controller.getMaxRetries();
+ }
+
+ public int getStopTimeoutMs() {
+ return controller.getStopTimeoutMs();
+ }
+
+ @Override
+ public Scheduler getScheduler() { return scheduler; }
+
+ /**
+ * Define a new task in this group. Adds it to the pending queue so that a
+ * container will be requested.
+ *
+ * @param task
+ */
+
+ @Override
+ public void start(Task task) {
+ assert task.getGroup() == null;
+ task.setGroup(this);
+ enqueuePendingRequest(task);
+ }
+
+ /**
+ * Put a task into the queue waiting to send a container request to YARN.
+ *
+ * @param task
+ */
+
+ public void enqueuePendingRequest(Task task) {
+ assert !activeContainers.containsValue(task);
+ assert !allocatingTasks.contains(task);
+ assert !pendingTasks.contains(task);
+ pendingTasks.add(task);
+
+ // Special initial-state notification
+
+ EventContext context = new EventContext(controller, task);
+ controller.fireLifecycleChange(TaskLifecycleListener.Event.CREATED,
+ context);
+ }
+
+ public int maxCurrentRequests() {
+ return this.scheduler.getTaskManager().maxConcurrentAllocs();
+ }
+
+ @Override
+ public boolean requestContainers(EventContext context, int maxRequests) {
+ if (pendingTasks.isEmpty()) {
+ return false;
+ }
+
+ // Limit the maximum number of requests to the limit set by
+ // the scheduler.
+
+ maxRequests = Math.min(maxRequests, maxCurrentRequests());
+
+ // Further limit requests to account for in-flight requests.
+
+ maxRequests -= allocatingTasks.size( );
+
+ // Request containers as long as there are pending tasks remaining.
+
+ for (int i = 0; i < maxRequests && !pendingTasks.isEmpty(); i++) {
+ context.setTask(pendingTasks.get(0));
+ context.getState().requestContainer(context);
+ }
+ return true;
+ }
+
+ /**
+ * Remove a task from the queue of tasks waiting to send a container request.
+ * The caller must put the task into the proper next state: the allocating
+ * queue or the completed task list.
+ *
+ * @param task
+ */
+
+ public void dequeuePendingRequest(Task task) {
+ assert !activeContainers.containsValue(task);
+ assert !allocatingTasks.contains(task);
+ assert pendingTasks.contains(task);
+ pendingTasks.remove(task);
+ }
+
+ /**
+ * Put a task onto the queue awaiting an allocation response from YARN.
+ *
+ * @param task
+ */
+
+ public void enqueueAllocatingTask(Task task) {
+ assert !activeContainers.containsValue(task);
+ assert !allocatingTasks.contains(task);
+ assert !pendingTasks.contains(task);
+ allocatingTasks.add(task);
+ }
+
+ @Override
+ public void containerAllocated(EventContext context, Container container) {
+ if (activeContainers.containsKey(container.getId())) {
+ LOG.error("Container allocated again: " + DoYUtil.labelContainer(container));
+ return;
+ }
+ if (allocatingTasks.isEmpty()) {
+
+ // Not sure why this happens. Maybe only in debug mode
+ // due stopping execution one thread while the RM
+ // heartbeat keeps sending our request over & over?
+ // One known case: the user requests a container. While YARN is
+ // considering the request, the user cancels the task.
+
+ LOG.warn("Releasing unwanted container: " + DoYUtil.labelContainer(container) );
+ context.yarn.releaseContainer(container);
+ return;
+ }
+ context.setTask(allocatingTasks.get(0));
+ context.getState().containerAllocated(context, container);
+ }
+
+ @Override
+ public void checkTasks(EventContext context, long curTime) {
+
+ // Iterate over tasks using a temporary list. The tick event may cause a timeout
+ // that turns around and modifies these lists.
+
+ List<Task> temp = new ArrayList<>( );
+ temp.addAll( allocatingTasks );
+ for (Task task : temp) {
+ context.setTask(task);
+ context.getState().tick(context, curTime);
+ }
+ temp.clear();
+ temp.addAll( pendingTasks );
+ for (Task task : temp) {
+ context.setTask(task);
+ context.getState().tick(context, curTime);
+ }
+ temp.clear();
+ temp.addAll( activeContainers.values( ) );
+ for (Task task : temp) {
+ context.setTask(task);
+ context.getState().tick(context, curTime);
+ }
+ }
+
+ /**
+ * Remove a task from the list of those waiting for a container allocation.
+ * The allocation may be done, or cancelled. The caller is responsible for
+ * moving the task to the next collection.
+ *
+ * @param task
+ */
+
+ public void dequeueAllocatingTask(Task task) {
+ assert allocatingTasks.contains(task);
+ allocatingTasks.remove(task);
+ }
+
+ /**
+ * Mark that a task has become active and should be tracked by its container
+ * ID. Prior to this, the task is not associated with a container.
+ *
+ * @param task
+ */
+
+ public void containerAllocated(Task task) {
+ assert !activeContainers.containsValue(task);
+ assert !allocatingTasks.contains(task);
+ assert !pendingTasks.contains(task);
+ activeContainers.put(task.getContainerId(), task);
+ controller.containerAllocated(task);
+ }
+
+ /**
+ * Mark that a task has completed: its container has expired or been revoked
+ * or the task has completed: successfully or a failure, as given by the
+ * task's disposition. The task can no longer be tracked by its container ID.
+ * If this is the last active task for this group, mark the group itself as
+ * completed.
+ *
+ * @param task
+ */
+
+ public void containerReleased(Task task) {
+ assert activeContainers.containsKey(task.getContainerId());
+ activeContainers.remove(task.getContainerId());
+ controller.containerReleased(task);
+ }
+
+ /**
+ * Mark that a task has completed successfully or a failure, as given by the
+ * task's disposition. If this is the last active task for this group, mark
+ * the group itself as completed.
+ *
+ * @param task
+ */
+
+ public void taskEnded(Task task) {
+ scheduler.completed(task);
+ controller.taskEnded(task);
+ if (isDone()) {
+ controller.taskGroupCompleted(this);
+ }
+ LOG.info(task.toString() + " - Task completed" );
+ }
+
+ /**
+ * Mark that a task is about to be retried. Task still retains its state from
+ * the current try.
+ *
+ * @param task
+ */
+
+ public void taskRetried(Task task) {
+ controller.taskRetried(task);
+ }
+
+ @Override
+ public void shutDown(EventContext context) {
+ for (Task task : getStartingTasks()) {
+ context.setTask(task);
+ context.getState().cancel(context);
+ }
+ for (Task task : getActiveTasks()) {
+ context.setTask(task);
+ context.getState().cancel(context);
+ }
+ }
+
+ /**
+ * Report if this task group has any tasks in the active part of their
+ * life-cycle: pending, allocating or active.
+ *
+ * @return
+ */
+
+ public boolean hasTasks() {
+ return getTaskCount() != 0;
+ }
+
+ @Override
+ public boolean isDone() {
+ return !hasTasks() && !scheduler.hasMoreTasks();
+ }
+
+ @Override
+ public void adjustTasks() {
+ scheduler.adjust();
+ }
+
+ /**
+ * Request a graceful stop of the task. Delegates to the task manager to do
+ * the actual work.
+ *
+ * @return true if the graceful stop request was sent, false if not, or if
+ * this task type has no graceful stop
+ */
+
+ public boolean requestStop(Task task) {
+ return scheduler.getTaskManager().stop(task);
+ }
+
+ @Override
+ public int getTaskCount() {
+ return pendingTasks.size() + allocatingTasks.size()
+ + activeContainers.size();
+ }
+
+ @Override
+ public int getCancelledTaskCount() {
+
+ // TODO Crude first cut. This value should be maintained
+ // as a count.
+
+ int count = 0;
+ for (Task task : pendingTasks) {
+ if (task.isCancelled()) {
+ count++;
+ }
+ }
+ for (Task task : allocatingTasks) {
+ if (task.isCancelled()) {
+ count++;
+ }
+ }
+ for (Task task : activeContainers.values()) {
+ if (task.isCancelled()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public List<Task> getStartingTasks() {
+ List<Task> tasks = new ArrayList<>();
+ for (Task task : pendingTasks) {
+ if (!task.isCancelled()) {
+ tasks.add(task);
+ }
+ }
+ for (Task task : allocatingTasks) {
+ if (!task.isCancelled()) {
+ tasks.add(task);
+ }
+ }
+ return tasks;
+ }
+
+ @Override
+ public List<Task> getActiveTasks() {
+ List<Task> tasks = new ArrayList<>();
+ for (Task task : activeContainers.values()) {
+ if (!task.isCancelled()) {
+ tasks.add(task);
+ }
+ }
+ return tasks;
+ }
+
+ @Override
+ public void cancel(Task task) {
+ EventContext context = new EventContext(controller, task);
+ LOG.info( task.getLabel() + " Task cancelled" );
+ context.getState().cancel(context);
+ }
+
+ @Override
+ public int getLiveCount() {
+ int count = 0;
+ for (Task task : activeContainers.values()) {
+ if (task.isLive()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public void visitTaskModels(TaskVisitor visitor) {
+ for (Task task : pendingTasks) {
+ visitor.visit(task);
+ }
+ for (Task task : allocatingTasks) {
+ visitor.visit(task);
+ }
+ for (Task task : activeContainers.values()) {
+ visitor.visit(task);
+ }
+ }
+
+ @Override
+ public Task getTask(int id) {
+ for (Task task : pendingTasks) {
+ if (task.getId() == id) {
+ return task;
+ }
+ }
+ for (Task task : allocatingTasks) {
+ if (task.getId() == id) {
+ return task;
+ }
+ }
+ for (Task task : activeContainers.values()) {
+ if (task.getId() == id) {
+ return task;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public int getRequestCount() {
+ return allocatingTasks.size();
+ }
+
+ @Override
+ public ClusterController getController( ) { return controller; }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java
new file mode 100644
index 0000000..147f5f7
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.LaunchSpec;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+
+/**
+ * AM-side state of individual containers. This class is mostly
+ * a holder of state. Behavior is provided by the
+ * {@link TaskState} subclasses.
+ */
+
+public class Task {
+ /**
+ * Tracking plugin state. A task can be untracked, or moves
+ * though states<br>
+ * NEW --> START_ACK --> END_ACK
+ * <p>
+ * Tracking state is separate from, but integrated with,
+ * task state. This is because, due to latency, tracking
+ * events may be slightly out of sync with YARN events.
+ */
+
+ public enum TrackingState
+ {
+ UNTRACKED( "N/A" ),
+ NEW( "Waiting" ),
+ START_ACK( "OK" ),
+ END_ACK( "Deregistered" );
+
+ private String displayName;
+
+ private TrackingState( String displayName ) {
+ this.displayName = displayName;
+ }
+
+ public String getDisplayName( ) { return displayName; }
+ }
+
+ public enum Disposition
+ {
+ CANCELLED, LAUNCH_FAILED, RUN_FAILED, COMPLETED, TOO_MANY_RETRIES, RETRIED
+ }
+
+ /**
+ * Maximum amount of time to wait when canceling a job in the REQUESTING
+ * state. YARN will happily wait forever for a resource, this setting allows
+ * the user to request to cancel a task, give YARN a while to respond, then
+ * forcibly cancel the job at timeout.
+ */
+
+ public static final long MAX_CANCELLATION_TIME = 10_000; // ms = 10s
+
+ /**
+ * Tasks receive a sequential internal task ID. Since all task
+ * creation is single-threaded, no additional concurrency controls
+ * are needed to protect this value.
+ */
+
+ private static volatile int taskCounter = 0;
+
+ /**
+ * Internal identifier for the task.
+ */
+
+ public final int taskId;
+
+
+ public final Scheduler scheduler;
+
+ /**
+ * Identifies the type of container needed and the details of the task to run.
+ */
+
+ public TaskSpec taskSpec;
+
+ /**
+ * The scheduler group that manages this task.
+ */
+
+ public SchedulerStateImpl taskGroup;
+
+ /**
+ * Tracking state for an additional task tracker (such as using
+ * ZooKeeper to track Drill-bits.)
+ */
+
+ protected TrackingState trackingState;
+
+ /**
+ * Tracks the container request between request and allocation. We must pass
+ * the container request back to YARN to remove it once it is allocated.
+ */
+
+ public ContainerRequest containerRequest;
+
+ /**
+ * The YARN container assigned to this task. The container is set only during
+ * the ALLOCATED, LAUNCHING, RUNNING and ENDING states.
+ */
+
+ public Container container;
+
+ /**
+ * Life-cycle state of this task.
+ */
+
+ protected TaskState state;
+
+ /**
+ * True if the application has requested that the resource request or
+ * application run be cancelled. Cancelled tasks are not subject to retry.
+ */
+
+ protected boolean cancelled;
+
+ /**
+ * Disposition of a completed task: whether it was cancelled, succeeded or
+ * failed.
+ */
+
+ public Disposition disposition;
+
+ public Throwable error;
+
+ public int tryCount;
+
+ public ContainerStatus completionStatus;
+
+ public long launchTime;
+ public long stateStartTime;
+ public long completionTime;
+
+ long cancellationTime;
+
+ public Map<String,Object> properties = new HashMap<>( );
+
+ public Task(Scheduler scheduler, TaskSpec taskSpec) {
+ taskId = ++taskCounter;
+ this.scheduler = scheduler;
+ this.taskSpec = taskSpec;
+ state = TaskState.START;
+ resetTrackingState();
+ }
+
+ /**
+ * Special constructor to create a static copy of the current
+ * task. The copy is placed in the completed tasks list.
+ * @param task
+ */
+
+ private Task(Task task) {
+ taskId = task.taskId;
+ scheduler = task.scheduler;
+ taskSpec = task.taskSpec;
+ taskGroup = task.taskGroup;
+ trackingState = task.trackingState;
+ containerRequest = task.containerRequest;
+ container = task.container;
+ state = task.state;
+ cancelled = task.cancelled;
+ disposition = task.disposition;
+ error = task.error;
+ tryCount = task.tryCount;
+ completionStatus = task.completionStatus;
+ launchTime = task.launchTime;
+ stateStartTime = task.stateStartTime;
+ completionTime = task.completionTime;
+ cancellationTime = task.cancellationTime;
+ properties.putAll( task.properties );
+ }
+
+ public void resetTrackingState( ) {
+ trackingState = scheduler.isTracked() ? TrackingState.NEW : TrackingState.UNTRACKED;
+ }
+
+ public int getId( ) { return taskId; }
+ public ContainerRequestSpec getContainerSpec() { return taskSpec.containerSpec; }
+
+ public LaunchSpec getLaunchSpec() { return taskSpec.launchSpec; }
+
+ public TaskState getState() { return state; }
+
+ public ContainerId getContainerId() {
+ assert container != null;
+ return container.getId();
+ }
+
+ public Container getContainer() {
+ assert container != null;
+ return container;
+ }
+
+ public int getTryCount() { return tryCount; }
+
+ public boolean isFailed() {
+ return disposition != null && disposition != Disposition.COMPLETED;
+ }
+
+ public Disposition getDisposition() { return disposition; }
+
+ public SchedulerStateImpl getGroup() { return taskGroup; }
+
+ public void setGroup(SchedulerStateImpl taskGroup) { this.taskGroup = taskGroup; }
+
+ public boolean retryable() {
+ return !cancelled && disposition != Disposition.COMPLETED;
+ }
+
+ public boolean isCancelled() { return cancelled; }
+
+ /**
+ * Reset the task state in preparation for a retry.
+ * Note: state reset is done by the state class.
+ */
+
+ public void reset() {
+ assert !cancelled;
+ error = null;
+ disposition = null;
+ completionStatus = null;
+ launchTime = 0;
+ completionTime = 0;
+ cancellationTime = 0;
+ container = null;
+ resetTrackingState();
+ }
+
+ public long uptime() {
+ long endTime = completionTime;
+ if (endTime == 0) {
+ endTime = System.currentTimeMillis();
+ }
+ return endTime - launchTime;
+ }
+
+ public String getHostName() {
+ if (container == null) {
+ return null;
+ }
+ return container.getNodeId().getHost();
+ }
+
+ public TrackingState getTrackingState() {
+ return trackingState;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("[id=")
+ .append(taskId)
+ .append(", type=");
+ // Scheduler is unset in some unit tests.
+ if (scheduler !=null ) {
+ buf.append(scheduler.getName());
+ }
+ buf.append(", name=")
+ .append(getName());
+ if (container != null) {
+ buf.append(", host=")
+ .append(getHostName());
+ }
+ buf.append(", state=")
+ .append(state.toString())
+ .append("]");
+ return buf.toString();
+ }
+
+ public boolean isLive() {
+ return state == TaskState.RUNNING && !cancelled;
+ }
+
+ public void cancel() {
+ cancelled = true;
+ cancellationTime = System.currentTimeMillis();
+ }
+
+ public Task copy() {
+ return new Task(this);
+ }
+
+ public String getName() {
+ return taskSpec == null ? null : taskSpec.name;
+ }
+
+ /**
+ * Label for this task displayed in log messages.
+ *
+ * @return
+ */
+
+ public String getLabel() {
+ return toString( );
+ }
+
+ public void setTrackingState(TrackingState tState) {
+ trackingState = tState;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java
new file mode 100644
index 0000000..218cd9b
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface TaskLifecycleListener {
+ public enum Event {
+ CREATED, ALLOCATED, RUNNING, ENDED
+ }
+
+ void stateChange(Event event, EventContext context);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java
new file mode 100644
index 0000000..4399a86
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.drill.yarn.core.ContainerRequestSpec;
+import org.apache.drill.yarn.core.LaunchSpec;
+
+public class TaskSpec {
+ /**
+ * Number of YARN vcores (virtual cores) and amount of memory (in MB) needed
+ * by this task.
+ */
+
+ public ContainerRequestSpec containerSpec;
+
+ /**
+ * Description of of the task process, environment and so on.
+ */
+
+ public LaunchSpec launchSpec;
+
+ public int maxRetries;
+
+ public String name;
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java
new file mode 100644
index 0000000..3d52105
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java
@@ -0,0 +1,895 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.Task.Disposition;
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ * Represents the behaviors associated with each state in the lifecycle
+ * of a task.
+ * <p>
+ * Startup process:
+ * <dl>
+ * <dt>START --> REQUESTING<dt>
+ * <dd>New task sends a container request to YARN.</dd>
+ * <dt>REQUESTING --> LAUNCHING<dt>
+ * <dd>Container received from YARN, launching the tasks's process.</dd>
+ * <dt>LAUNCHING --> RUNNING<dt>
+ * <dd>Task launched and needs no start Ack.</dd>
+ * <dt>LAUNCHING --> WAIT_START_ACK<dt>
+ * <dd>Task launched and needs a start Ack.</dd>
+ * <dt>WAIT_START_ACK --> RUNNING<dt>
+ * <dd>Start Ack received.</dd>
+ * </dl>
+ * <p>
+ * Shutdown process:
+ * <dt>RUNNING --> WAIT_END_ACK | END<dt>
+ * <dd>The resource manager reported task completion.</dd>
+ * <dt>RUNNING --> ENDING<dt>
+ * <dd>Request sent to the task for a graceful shutdown.</dd>
+ * <dt>RUNNING --> KILLING<dt>
+ * <dd>Request sent to the node manager to forcibly kill the task.</dd>
+ * <dt>ENDING --> WAIT_END_ACK | END<dt>
+ * <dd>The task gracefully exited as reported by the resource manager.</dd>
+ * <dt>ENDING --> KILLING<dt>
+ * <dd>The wait for graceful exit timed out, a forced kill message
+ * sent to the node manager.</dd>
+ * <dt>KILLING --> WAIT_END_ACK | END<dt>
+ * <dd>The task exited as reported by the resource manager.</dd>
+ * <dt>END_ACK --> END<dt>
+ * <dd>The end-ack is received or the wait timed out.</dd>
+ * <dl>
+ * <p>
+ * This is a do-it-yourself enum. Java enums values are instances of a single
+ * class. In this version, each enum value is the sole instance of a separate
+ * class, allowing each state to have its own behavior.
+ */
+
+public abstract class TaskState {
+ /**
+ * Task that is newly created and needs a container allocated. No messages
+ * have yet been sent to YARN for the task.
+ */
+
+ private static class StartState extends TaskState {
+ protected StartState() { super(false, TaskLifecycleListener.Event.CREATED, true); }
+
+ @Override
+ public void requestContainer(EventContext context) {
+ Task task = context.task;
+ task.tryCount++;
+ context.group.dequeuePendingRequest(task);
+ if (task.cancelled) {
+ taskStartFailed(context, Disposition.CANCELLED);
+ } else {
+ transition(context, REQUESTING);
+ context.group.enqueueAllocatingTask(task);
+ task.containerRequest = context.yarn
+ .requestContainer(task.getContainerSpec());
+ }
+ }
+
+ /**
+ * Cancellation is trivial: just drop the task; no need to coordinate
+ * with YARN.
+ */
+
+ @Override
+ public void cancel(EventContext context) {
+ Task task = context.task;
+ assert !task.cancelled;
+ context.group.dequeuePendingRequest(task);
+ task.cancel();
+ taskStartFailed(context, Disposition.CANCELLED);
+ }
+ }
+
+ /**
+ * Task for which a container request has been sent but not yet received.
+ */
+
+ private static class RequestingState extends TaskState {
+ protected RequestingState() {
+ super(false, TaskLifecycleListener.Event.CREATED, true);
+ }
+
+ /**
+ * Handle REQUESING --> LAUNCHING. Indicates that we've asked YARN to start
+ * the task on the allocated container.
+ */
+
+ @Override
+ public void containerAllocated(EventContext context, Container container) {
+ Task task = context.task;
+ LOG.info(task.getLabel() + " - Received container: "
+ + DoYUtil.describeContainer(container));
+ context.group.dequeueAllocatingTask(task);
+
+ // No matter what happens below, we don't want to ask for this
+ // container again. The RM async API is a bit bizarre in this
+ // regard: it will keep asking for container over and over until
+ // we tell it to stop.
+
+ context.yarn.removeContainerRequest(task.containerRequest);
+
+ // The container is need both in the normal and in the cancellation
+ // path, so set it here.
+
+ task.container = container;
+ if (task.cancelled) {
+ context.yarn.releaseContainer(container);
+ taskStartFailed(context, Disposition.CANCELLED);
+ return;
+ }
+ task.error = null;
+ task.completionStatus = null;
+ transition(context, LAUNCHING);
+
+ // The pool that manages this task wants to know that we have
+ // a container. The task manager may want to do some task-
+ // specific setup.
+
+ context.group.containerAllocated(context.task);
+ context.getTaskManager().allocated(context);
+
+ // Go ahead and launch a task in the container using the launch
+ // specification provided by the task group (pool).
+
+ try {
+ context.yarn.launchContainer(container, task.getLaunchSpec());
+ task.launchTime = System.currentTimeMillis();
+ } catch (YarnFacadeException e) {
+ LOG.error("Container launch failed: " + task.getContainerId(), e);
+
+ // This may not be the right response. RM may still think
+ // we have the container if the above is a local failure.
+
+ task.error = e;
+ context.group.containerReleased(task);
+ task.container = null;
+ taskStartFailed(context, Disposition.LAUNCH_FAILED);
+ }
+ }
+
+ /**
+ * Cancel the container request. We must wait for the response from YARN to
+ * do the actual cancellation. For now, just mark the task as cancelled.
+ */
+
+ @Override
+ public void cancel(EventContext context) {
+ Task task = context.task;
+ context.task.cancel();
+ LOG.info(task.getLabel() + " - Cancelled at user request");
+ context.yarn.removeContainerRequest(task.containerRequest);
+ context.group.dequeueAllocatingTask(task);
+ task.disposition = Task.Disposition.CANCELLED;
+ task.completionTime = System.currentTimeMillis();
+ transition(context, END);
+ context.group.taskEnded(context.task);
+ }
+
+ /**
+ * The task is requesting a container. If the request takes too long,
+ * cancel the request and shrink the target task count. This event
+ * generally indicates that the user wants to run more tasks than
+ * the cluster has capacity.
+ */
+
+ @Override
+ public void tick(EventContext context, long curTime) {
+ Task task = context.task;
+ int timeoutSec = task.scheduler.getRequestTimeoutSec( );
+ if (timeoutSec == 0) {
+ return;
+ }
+ if (task.stateStartTime + timeoutSec * 1000 > curTime) {
+ return;
+ }
+ LOG.info(task.getLabel() + " - Request timed out after + "
+ + timeoutSec + " secs.");
+ context.yarn.removeContainerRequest(task.containerRequest);
+ context.group.dequeueAllocatingTask(task);
+ task.disposition = Task.Disposition.LAUNCH_FAILED;
+ task.completionTime = System.currentTimeMillis();
+ transition(context, END);
+ context.group.taskEnded(context.task);
+ task.scheduler.requestTimedOut();
+ }
+ }
+
+ /**
+ * Task for which a container has been allocated and the task launch request
+ * sent. Awaiting confirmation that the task is running.
+ */
+
+ private static class LaunchingState extends TaskState {
+ protected LaunchingState() {
+ super(true, TaskLifecycleListener.Event.ALLOCATED, true);
+ }
+
+ /**
+ * Handle launch failure. Results in a LAUNCHING --> END transition or
+ * restart.
+ * <p>
+ * This situation can occur, when debugging, if a timeout occurs after the
+ * allocation message, such as when, sitting in the debugger on the
+ * allocation event.
+ */
+
+ @Override
+ public void launchFailed(EventContext context, Throwable t) {
+ Task task = context.task;
+ LOG.info(task.getLabel() + " - Container start failed");
+ context.task.error = t;
+ launchFailed(context);
+ }
+
+ /**
+ * Handle LAUNCHING --> RUNNING/START_ACK. Indicates that YARN has confirmed
+ * that the task is, indeed, running.
+ */
+
+ @Override
+ public void containerStarted(EventContext context) {
+ Task task = context.task;
+
+ // If this task is tracked (that is, it is a Drillbit which
+ // we monitor using ZK) then we have to decide if we've
+ // seen the task in the tracker yet. If we have, then the
+ // task is fully running. If we haven't, then we need to
+ // wait for the start acknowledgement.
+
+ if (task.trackingState == Task.TrackingState.NEW) {
+ transition(context, WAIT_START_ACK);
+ } else {
+ transition(context, RUNNING);
+ }
+ task.error = null;
+
+ // If someone came along and marked the task as cancelled,
+ // we are now done waiting for YARN so we can immediately
+ // turn around and kill the task. (Can't kill the task,
+ // however, until YARN starts it, hence the need to wait
+ // for YARN to start the task before killing it.)
+
+ if (task.cancelled) {
+ transition(context, KILLING);
+ context.yarn.killContainer(task.getContainer());
+ }
+ }
+
+ /**
+ * Out-of-order start ACK, perhaps due to network latency. Handle by staying
+ * in this state, but later jump directly<br>
+ * LAUNCHING --> RUNNING
+ */
+
+ @Override
+ public void startAck(EventContext context) {
+ context.task.trackingState = Task.TrackingState.START_ACK;
+ }
+
+ @Override
+ public void containerCompleted(EventContext context,
+ ContainerStatus status) {
+ // Seen on Mac when putting machine to sleep.
+ // Handle by failing & retrying.
+ completed(context, status);
+ endOrAck(context);
+ }
+
+ @Override
+ public void cancel(EventContext context) {
+ context.task.cancel();
+ context.yarn.killContainer(context.task.getContainer());
+ }
+
+ @Override
+ public void tick(EventContext context, long curTime) {
+
+ // If we are canceling the task, and YARN has not reported container
+ // completion after some amount of time, just force failure.
+
+ Task task = context.task;
+ if (task.isCancelled()
+ && task.cancellationTime + Task.MAX_CANCELLATION_TIME < curTime) {
+ LOG.error(task.getLabel() + " - Launch timed out after "
+ + Task.MAX_CANCELLATION_TIME / 1000 + " secs.");
+ launchFailed(context);
+ }
+ }
+
+ private void launchFailed(EventContext context) {
+ Task task = context.task;
+ task.completionTime = System.currentTimeMillis();
+
+ // Not sure if releasing the container is needed...
+
+ context.yarn.releaseContainer(task.container);
+ context.group.containerReleased(task);
+ task.container = null;
+ taskStartFailed(context, Disposition.LAUNCH_FAILED);
+ }
+ }
+
+ /**
+ * Task has been launched, is tracked, but we've not yet received a start ack.
+ */
+
+ private static class WaitStartAckState extends TaskState {
+ protected WaitStartAckState() {
+ super(true, TaskLifecycleListener.Event.RUNNING, true);
+ }
+
+ @Override
+ public void startAck(EventContext context) {
+ context.task.trackingState = Task.TrackingState.START_ACK;
+ transition(context, RUNNING);
+ }
+
+ @Override
+ public void cancel(EventContext context) {
+ RUNNING.cancel(context);
+ }
+
+ // @Override
+ // public void containerStopped(EventContext context) {
+ // transition(context, WAIT_COMPLETE );
+ // }
+
+ @Override
+ public void containerCompleted(EventContext context,
+ ContainerStatus status) {
+ completed(context, status);
+ taskTerminated(context);
+ }
+
+ // TODO: Timeout in this state.
+ }
+
+ /**
+ * Task in the normal running state.
+ */
+
+ private static class RunningState extends TaskState {
+ protected RunningState() {
+ super(true, TaskLifecycleListener.Event.RUNNING, true);
+ }
+
+ /**
+ * Normal task completion. Implements the RUNNING --> END transition.
+ *
+ * @param status
+ */
+
+ @Override
+ public void containerCompleted(EventContext context,
+ ContainerStatus status) {
+ completed(context, status);
+ endOrAck(context);
+ }
+
+ @Override
+ public void cancel(EventContext context) {
+ Task task = context.task;
+ task.cancel();
+ if (context.group.requestStop(task)) {
+ transition(context, ENDING);
+ } else {
+ context.yarn.killContainer(task.container);
+ transition(context, KILLING);
+ }
+ }
+
+ /**
+ * The task claims that it is complete, but we think it is running. Assume
+ * that the task has started its own graceful shutdown (or the
+ * equivalent).<br>
+ * RUNNING --> ENDING
+ */
+
+ @Override
+ public void completionAck(EventContext context) {
+ context.task.trackingState = Task.TrackingState.END_ACK;
+ transition(context, ENDING);
+ }
+ }
+
+ /**
+ * Task for which a termination request has been sent to the Drill-bit, but
+ * confirmation has not yet been received from the Node Manager. (Not yet
+ * supported in the Drill-bit.
+ */
+
+ public static class EndingState extends TaskState {
+ protected EndingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); }
+
+ /*
+ * Normal ENDING --> WAIT_COMPLETE transition, awaiting Resource Manager
+ * confirmation.
+ */
+
+// @Override
+// public void containerStopped(EventContext context) {
+// transition(context, WAIT_COMPLETE);
+// }
+
+ /**
+ * Normal ENDING --> WAIT_END_ACK | END transition.
+ *
+ * @param status
+ */
+
+ @Override
+ public void containerCompleted(EventContext context,
+ ContainerStatus status) {
+ completed(context, status);
+ endOrAck(context);
+ }
+
+ @Override
+ public void cancel(EventContext context) {
+ context.task.cancel();
+ }
+
+ /**
+ * If the graceful stop process exceeds the maximum timeout, go ahead and
+ * forcibly kill the process.
+ */
+
+ @Override
+ public void tick(EventContext context, long curTime) {
+ Task task = context.task;
+ if (curTime - task.stateStartTime > task.taskGroup.getStopTimeoutMs()) {
+ context.yarn.killContainer(task.container);
+ transition(context, KILLING);
+ }
+ }
+
+ @Override
+ public void completionAck(EventContext context) {
+ context.task.trackingState = Task.TrackingState.END_ACK;
+ }
+ }
+
+ /**
+ * Task for which a forced termination request has been sent to the Node
+ * Manager, but a stop message has not yet been received.
+ */
+
+ public static class KillingState extends TaskState {
+ protected KillingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); }
+
+ /*
+ * Normal KILLING --> WAIT_COMPLETE transition, awaiting Resource Manager
+ * confirmation.
+ */
+
+// @Override
+// public void containerStopped(EventContext context) {
+// transition(context, WAIT_COMPLETE);
+// }
+
+ /**
+ * Normal KILLING --> WAIT_END_ACK | END transition.
+ *
+ * @param status
+ */
+
+ @Override
+ public void containerCompleted(EventContext context,
+ ContainerStatus status) {
+ completed(context, status);
+ endOrAck(context);
+ }
+
+ @Override
+ public void cancel(EventContext context) {
+ context.task.cancel();
+ }
+
+ @Override
+ public void startAck(EventContext context) {
+ // Better late than never... Happens during debugging sessions
+ // when order of messages is scrambled.
+
+ context.task.trackingState = Task.TrackingState.START_ACK;
+ }
+
+ @Override
+ public void completionAck(EventContext context) {
+ context.task.trackingState = Task.TrackingState.END_ACK;
+ }
+
+ @Override
+ public void stopTaskFailed(EventContext context, Throwable t) {
+ assert false;
+ // What to do?
+ }
+ }
+
+ /**
+ * Task exited, but we are waiting for confirmation from Zookeeper that
+ * the Drillbit registration has been removed. Required to associate
+ * ZK registrations with Drillbits. Ensures that we don't try to
+ * start a new Drillbit on a node until the previous Drillbit
+ * completely shut down, including dropping out of ZK.
+ */
+
+ private static class WaitEndAckState extends TaskState {
+ protected WaitEndAckState() {
+ super(false, TaskLifecycleListener.Event.RUNNING, false);
+ }
+
+ @Override
+ public void cancel(EventContext context) {
+ context.task.cancel();
+ }
+
+ @Override
+ public void completionAck(EventContext context) {
+ context.task.trackingState = Task.TrackingState.END_ACK;
+ taskTerminated(context);
+ }
+
+ /**
+ * Periodically check if the process is still live. We are supposed to
+ * receive events when the task becomes deregistered. But, we've seen
+ * cases where the task hangs in this state forever. Try to resolve
+ * the issue by polling periodically.
+ */
+
+ @Override
+ public void tick(EventContext context, long curTime) {
+ if(! context.getTaskManager().isLive(context)){
+ taskTerminated(context);
+ }
+ }
+ }
+
+ /**
+ * Task is completed or failed. The disposition field gives the details of the
+ * completion type. The task is not active on YARN, but could be retried.
+ */
+
+ private static class EndState extends TaskState {
+ protected EndState() {
+ super(false, TaskLifecycleListener.Event.ENDED, false);
+ }
+
+ /*
+ * Ignore out-of-order Node Manager completion notices.
+ */
+
+ // @Override
+ // public void containerStopped(EventContext context) {
+ // }
+
+ @Override
+ public void cancel(EventContext context) {
+ }
+ }
+
+ private static final Log LOG = LogFactory.getLog(TaskState.class);
+
+ public static final TaskState START = new StartState();
+ public static final TaskState REQUESTING = new RequestingState();
+ public static final TaskState LAUNCHING = new LaunchingState();
+ public static final TaskState WAIT_START_ACK = new WaitStartAckState();
+ public static final TaskState RUNNING = new RunningState();
+ public static final TaskState ENDING = new EndingState();
+ public static final TaskState KILLING = new KillingState();
+ public static final TaskState WAIT_END_ACK = new WaitEndAckState();
+ public static final TaskState END = new EndState();
+
+ protected final boolean hasContainer;
+ protected final TaskLifecycleListener.Event lifeCycleEvent;
+ protected final String label;
+ protected final boolean cancellable;
+
+ public TaskState(boolean hasContainer, TaskLifecycleListener.Event lcEvent,
+ boolean cancellable) {
+ this.hasContainer = hasContainer;
+ lifeCycleEvent = lcEvent;
+ this.cancellable = cancellable;
+ String name = toString();
+ name = name.replace("State", "");
+ name = name.replaceAll("([a-z]+)([A-Z])", "$1_$2");
+ label = name.toUpperCase();
+ }
+
+ protected void endOrAck(EventContext context) {
+ if (context.task.trackingState == Task.TrackingState.START_ACK) {
+ transition(context, WAIT_END_ACK);
+ } else {
+ taskTerminated(context);
+ }
+ }
+
+ public void requestContainer(EventContext context) {
+ illegalState(context, "requestContainer");
+ }
+
+ /**
+ * Resource Manager reports that the task has been allocated a container.
+ *
+ * @param context
+ * @param container
+ */
+
+ public void containerAllocated(EventContext context, Container container) {
+ illegalState(context, "containerAllocated");
+ }
+
+ /**
+ * The launch of the container failed.
+ *
+ * @param context
+ * @param t
+ */
+
+ public void launchFailed(EventContext context, Throwable t) {
+ illegalState(context, "launchFailed");
+ }
+
+ /**
+ * Node Manager reports that the task has started execution.
+ *
+ * @param context
+ */
+
+ public void containerStarted(EventContext context) {
+ illegalState(context, "containerStarted");
+ }
+
+ /**
+ * The monitoring plugin has detected that the task has confirmed that it is
+ * fully started.
+ */
+
+ public void startAck(EventContext context) {
+ illegalState(context, "startAck");
+ }
+
+ /**
+ * The node manager request to stop a task failed.
+ *
+ * @param context
+ * @param t
+ */
+
+ public void stopTaskFailed(EventContext context, Throwable t) {
+ illegalState(context, "stopTaskFailed");
+ }
+
+ /**
+ * The monitoring plugin has detected that the task has confirmed that it has
+ * started shutdown.
+ */
+
+ public void completionAck(EventContext context) {
+ illegalState(context, "completionAck");
+ }
+
+ /**
+ * Node Manager reports that the task has stopped execution. We don't yet know
+ * if this was a success or failure.
+ *
+ * @param context
+ */
+
+ public void containerStopped(EventContext context) {
+ illegalState(context, "containerStopped");
+ }
+
+ /**
+ * Resource Manager reports that the task has completed execution and provided
+ * the completion status.
+ *
+ * @param context
+ * @param status
+ */
+
+ public void containerCompleted(EventContext context, ContainerStatus status) {
+ completed(context, status);
+ illegalState(context, "containerCompleted");
+ }
+
+ /**
+ * Cluster manager wishes to cancel this task.
+ *
+ * @param context
+ */
+
+ public void cancel(EventContext context) {
+ illegalState(context, "cancel");
+ }
+
+ public void tick(EventContext context, long curTime) {
+ // Ignore by default
+ }
+
+ /**
+ * Implement a state transition, alerting any life cycle listeners and
+ * updating the log file. Marks the start time of the new state in support of
+ * states that implement a timeout.
+ *
+ * @param context
+ * @param newState
+ */
+
+ protected void transition(EventContext context, TaskState newState) {
+ TaskState oldState = context.task.state;
+ LOG.info(context.task.getLabel() + " " + oldState.toString() + " --> "
+ + newState.toString());
+ context.task.state = newState;
+ if (newState.lifeCycleEvent != oldState.lifeCycleEvent) {
+ context.controller.fireLifecycleChange(newState.lifeCycleEvent, context);
+ }
+ context.task.stateStartTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Task failed when starting. No container has been allocated. The task
+ * will go from:<br>
+ * * --> END
+ * <p>
+ * If the run failed, and the task can be retried, it may
+ * then move from<br>
+ * END --> STARTING
+ * @param context
+ * @param disposition
+ */
+
+ protected void taskStartFailed(EventContext context,
+ Disposition disposition) {
+
+ // No container, so don't alert the task manager.
+
+ assert context.task.container == null;
+
+ context.getTaskManager().completed(context);
+ taskEnded(context, disposition);
+ retryTask(context);
+ }
+
+ /**
+ * A running task terminated. It may have succeeded or failed,
+ * this method will determine which.
+ * <p>
+ * Every task goes from:<br>
+ * * --> END
+ * <p>
+ * If the run failed, and the task can be retried, it may
+ * then move from<br>
+ * END --> STARTING
+ *
+ * @param context
+ */
+
+ protected void taskTerminated(EventContext context) {
+ Task task = context.task;
+
+ // Give the task manager a peek at the completed task.
+ // The task manager can override retry behavior. To
+ // cancel a task that would otherwise be retried, call
+ // cancel( ) on the task.
+
+ context.getTaskManager().completed(context);
+ context.group.containerReleased(task);
+ assert task.completionStatus != null;
+ if (task.completionStatus.getExitStatus() == 0) {
+ taskEnded(context, Disposition.COMPLETED);
+ context.group.taskEnded(context.task);
+ } else {
+ taskEnded(context, Disposition.RUN_FAILED);
+ retryTask(context);
+ }
+ }
+
+ /**
+ * Implements the details of marking a task as ended. Note, this method
+ * does not deregister the task with the scheduler state, we keep it
+ * registered in case we decide to retry.
+ *
+ * @param context
+ * @param disposition
+ */
+
+ private void taskEnded(EventContext context, Disposition disposition) {
+ Task task = context.task;
+ if (disposition == null) {
+ assert task.disposition != null;
+ } else {
+ task.disposition = disposition;
+ }
+ task.completionTime = System.currentTimeMillis();
+ transition(context, END);
+ }
+
+ /**
+ * Retry a task. Requires that the task currently be in the END state to provide
+ * clean state transitions. Will deregister the task if it cannot be retried
+ * because the cluster is ending or the task has failed too many times.
+ * Otherwise, starts the whole life cycle over again.
+ *
+ * @param context
+ */
+
+ private void retryTask(EventContext context) {
+ Task task = context.task;
+ assert task.state == END;
+ if (!context.controller.isLive() || !task.retryable()) {
+ context.group.taskEnded(task);
+ return;
+ }
+ if (task.tryCount > task.taskGroup.getMaxRetries()) {
+ LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount);
+ task.disposition = Disposition.TOO_MANY_RETRIES;
+ context.group.taskEnded(task);
+ return;
+ }
+ LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount);
+ context.group.taskRetried(task);
+ task.reset();
+ transition(context, START);
+ context.group.enqueuePendingRequest(task);
+ }
+
+ /**
+ * An event is called in a state where it is not expected. Log it, ignore it
+ * and hope it goes away.
+ *
+ * @param action
+ */
+
+ private void illegalState(EventContext context, String action) {
+ // Intentionally assert: fails during debugging, soldiers on in production.
+
+ assert false;
+ LOG.error(context.task.getLabel() + " - Action " + action
+ + " in wrong state: " + toString(),
+ new IllegalStateException("Action in wrong state"));
+ }
+
+ protected void completed(EventContext context, ContainerStatus status) {
+ Task task = context.task;
+ String diag = status.getDiagnostics();
+ LOG.trace(
+ task.getLabel() + " Completed, exit status: " + status.getExitStatus()
+ + (DoYUtil.isBlank(diag) ? "" : ": " + status.getDiagnostics()));
+ task.completionStatus = status;
+ }
+
+ @Override
+ public String toString() { return getClass().getSimpleName(); }
+
+ public boolean hasContainer() { return hasContainer; }
+
+ public String getLabel() { return label; }
+
+ public boolean isCancellable() {
+ return cancellable;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java
new file mode 100644
index 0000000..c90d4f8
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+public interface TaskVisitor {
+ void visit(Task task);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java
new file mode 100644
index 0000000..8ac0a5d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster;
+
+/**
+ * Exceptions thrown from the YARN facade: the wrapper around the YARN AM
+ * interfaces.
+ */
+
+@SuppressWarnings("serial")
+public class YarnFacadeException extends Exception {
+ public YarnFacadeException(String msg, Exception e) {
+ super(msg, e);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java
new file mode 100644
index 0000000..fbca171
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.appMaster.http;
+
+/**
+ * Security manager for the Application Master. Allows a variety
+ * of security systems, including Drill's user authentication
+ * and DoY's static user/password, or an open AM web UI.
+ */
+
+public interface AMSecurityManager {
+ void init();
+
+ boolean requiresLogin();
+
+ boolean login(String user, String password);
+
+ void close();
+}