You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:14 UTC
[06/13] tajo git commit: TAJO-1288: Refactoring
org.apache.tajo.master package.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
new file mode 100644
index 0000000..351856f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -0,0 +1,926 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.plan.serder.LogicalNodeSerializer;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class DefaultTaskScheduler extends AbstractTaskScheduler {
+ private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
+
+ private final TaskSchedulerContext context;
+ private Stage stage;
+
+ private Thread schedulingThread;
+ private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
+
+ private ScheduledRequests scheduledRequests;
+ private TaskRequests taskRequests;
+
+ private int nextTaskId = 0;
+ private int scheduledObjectNum = 0;
+
+ public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) {
+ super(DefaultTaskScheduler.class.getName());
+ this.context = context;
+ this.stage = stage;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+
+ scheduledRequests = new ScheduledRequests();
+ taskRequests = new TaskRequests();
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Start TaskScheduler");
+
+ this.schedulingThread = new Thread() {
+ public void run() {
+
+ while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ synchronized (schedulingThread){
+ schedulingThread.wait(100);
+ }
+ schedule();
+ } catch (InterruptedException e) {
+ break;
+ } catch (Throwable e) {
+ LOG.fatal(e.getMessage(), e);
+ break;
+ }
+ }
+ LOG.info("TaskScheduler schedulingThread stopped");
+ }
+ };
+
+ this.schedulingThread.start();
+ super.start();
+ }
+
+ private static final TaskAttemptId NULL_ATTEMPT_ID;
+ public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
+ static {
+ ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+ NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
+
+ TajoWorkerProtocol.TaskRequestProto.Builder builder =
+ TajoWorkerProtocol.TaskRequestProto.newBuilder();
+ builder.setId(NULL_ATTEMPT_ID.getProto());
+ builder.setShouldDie(true);
+ builder.setOutputTable("");
+ builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+ builder.setClusteredOutput(false);
+ stopTaskRunnerReq = builder.build();
+ }
+
+ @Override
+ public void stop() {
+ if(stopEventHandling.getAndSet(true)){
+ return;
+ }
+
+ if (schedulingThread != null) {
+ synchronized (schedulingThread) {
+ schedulingThread.notifyAll();
+ }
+ }
+
+ // Return all of request callbacks instantly.
+ if(taskRequests != null){
+ for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+ req.getCallback().run(stopTaskRunnerReq);
+ }
+ }
+
+ LOG.info("Task Scheduler stopped");
+ super.stop();
+ }
+
+ private Fragment[] fragmentsForNonLeafTask;
+ private Fragment[] broadcastFragmentsForNonLeafTask;
+
+ LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
+ public void schedule() {
+
+ if (taskRequests.size() > 0) {
+ if (scheduledRequests.leafTaskNum() > 0) {
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+ taskRequests.size() + ", LeafTask Schedule Request: " +
+ scheduledRequests.leafTaskNum());
+ taskRequests.getTaskRequests(taskRequestEvents,
+ scheduledRequests.leafTaskNum());
+ LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+ if (taskRequestEvents.size() > 0) {
+ scheduledRequests.assignToLeafTasks(taskRequestEvents);
+ taskRequestEvents.clear();
+ }
+ }
+ }
+
+ if (taskRequests.size() > 0) {
+ if (scheduledRequests.nonLeafTaskNum() > 0) {
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+ taskRequests.size() + ", NonLeafTask Schedule Request: " +
+ scheduledRequests.nonLeafTaskNum());
+ taskRequests.getTaskRequests(taskRequestEvents,
+ scheduledRequests.nonLeafTaskNum());
+ scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
+ taskRequestEvents.clear();
+ }
+ }
+ }
+
+ @Override
+ public void handle(TaskSchedulerEvent event) {
+ if (event.getType() == EventType.T_SCHEDULE) {
+ if (event instanceof FragmentScheduleEvent) {
+ FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+ if (context.isLeafQuery()) {
+ TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext();
+ Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
+ task.addFragment(castEvent.getLeftFragment(), true);
+ scheduledObjectNum++;
+ if (castEvent.hasRightFragments()) {
+ task.addFragments(castEvent.getRightFragments());
+ }
+ stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ } else {
+ fragmentsForNonLeafTask = new FileFragment[2];
+ fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
+ if (castEvent.hasRightFragments()) {
+ FileFragment[] rightFragments = castEvent.getRightFragments().toArray(new FileFragment[]{});
+ fragmentsForNonLeafTask[1] = rightFragments[0];
+ if (rightFragments.length > 1) {
+ broadcastFragmentsForNonLeafTask = new FileFragment[rightFragments.length - 1];
+ System.arraycopy(rightFragments, 1, broadcastFragmentsForNonLeafTask, 0, broadcastFragmentsForNonLeafTask.length);
+ } else {
+ broadcastFragmentsForNonLeafTask = null;
+ }
+ }
+ }
+ } else if (event instanceof FetchScheduleEvent) {
+ FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+ Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
+ TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
+ Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
+ scheduledObjectNum++;
+ for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
+ task.addFetches(eachFetch.getKey(), eachFetch.getValue());
+ task.addFragment(fragmentsForNonLeafTask[0], true);
+ if (fragmentsForNonLeafTask[1] != null) {
+ task.addFragment(fragmentsForNonLeafTask[1], true);
+ }
+ }
+ if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) {
+ task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask));
+ }
+ stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ } else if (event instanceof TaskAttemptToSchedulerEvent) {
+ TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
+ if (context.isLeafQuery()) {
+ scheduledRequests.addLeafTask(castEvent);
+ } else {
+ scheduledRequests.addNonLeafTask(castEvent);
+ }
+ }
+ } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
+ // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler.
+ // This event is triggered by TaskAttempt.
+ TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event;
+ scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId());
+ LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+ ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle(
+ new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
+ }
+ }
+
+ @Override
+ public void handleTaskRequestEvent(TaskRequestEvent event) {
+
+ taskRequests.handle(event);
+ int hosts = scheduledRequests.leafTaskHostMapping.size();
+
+ // if available cluster resource are large then tasks, the scheduler thread are working immediately.
+ if(remainingScheduledObjectNum() > 0 &&
+ (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
+ synchronized (schedulingThread){
+ schedulingThread.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public int remainingScheduledObjectNum() {
+ return scheduledObjectNum;
+ }
+
+ private class TaskRequests implements EventHandler<TaskRequestEvent> {
+ private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+ new LinkedBlockingQueue<TaskRequestEvent>();
+
+ @Override
+ public void handle(TaskRequestEvent event) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+ }
+
+ if(stopEventHandling.get()) {
+ event.getCallback().run(stopTaskRunnerReq);
+ return;
+ }
+ int qSize = taskRequestQueue.size();
+ if (qSize != 0 && qSize % 1000 == 0) {
+ LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
+ }
+ int remCapacity = taskRequestQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue "
+ + "of DefaultTaskScheduler: " + remCapacity);
+ }
+
+ taskRequestQueue.add(event);
+ }
+
+ public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+ int num) {
+ taskRequestQueue.drainTo(taskRequests, num);
+ }
+
+ public int size() {
+ return taskRequestQueue.size();
+ }
+ }
+
+ /**
+ * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
+ * describes various information for one worker, including :
+ * <ul>
+ * <li>host name</li>
+ * <li>rack name</li>
+ * <li>unassigned tasks for each disk volume</li>
+ * <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
+ * <li>the number of running tasks for each volume</li>
+ * </ul>, each task runner and the concurrency number of running tasks for volumes.
+ *
+ * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
+ * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
+ * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
+ *
+ * <h3>Volume id</h3>
+ * Volume id is an integer. Each volume id identifies each disk volume.
+ *
+ * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}. *
+ * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
+ * In this case, the volume id will be -1 or other native integer.
+ *
+ * <h3>See Also</h3>
+ * <ul>
+ * <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
+ * </ul>
+ */
+ public class HostVolumeMapping {
+ private final String host;
+ private final String rack;
+ /** A key is disk volume, and a value is a list of tasks to be scheduled. */
+ private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume =
+ Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>());
+ /** A value is last assigned volume id for each task runner */
+ private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId,
+ Integer>();
+ /**
+ * A key is disk volume id, and a value is the load of this volume.
+ * This load is measured by counting how many number of tasks are running.
+ *
+ * These disk volumes are kept in an order of ascending order of the volume id.
+ * In other words, the head volume ids are likely to -1, meaning no given volume id.
+ */
+ private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
+ /** The total number of remain tasks in this host */
+ private AtomicInteger remainTasksNum = new AtomicInteger(0);
+ public static final int REMOTE = -2;
+
+
+ public HostVolumeMapping(String host, String rack){
+ this.host = host;
+ this.rack = rack;
+ }
+
+ public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){
+ synchronized (unassignedTaskForEachVolume){
+ LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+ if (list == null) {
+ list = new LinkedHashSet<TaskAttempt>();
+ unassignedTaskForEachVolume.put(volumeId, list);
+ }
+ list.add(attemptId);
+ }
+
+ remainTasksNum.incrementAndGet();
+
+ if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
+ }
+
+ /**
+ * Priorities
+ * 1. a task list in a volume of host
+ * 2. unknown block or Non-splittable task in host
+ * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
+ */
+ public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) {
+ int volumeId;
+ TaskAttemptId taskAttemptId = null;
+
+ if (!lastAssignedVolumeId.containsKey(containerId)) {
+ volumeId = getLowestVolumeId();
+ increaseConcurrency(containerId, volumeId);
+ } else {
+ volumeId = lastAssignedVolumeId.get(containerId);
+ }
+
+ if (unassignedTaskForEachVolume.size() > 0) {
+ int retry = unassignedTaskForEachVolume.size();
+ do {
+ //clean and get a remaining local task
+ taskAttemptId = getAndRemove(volumeId);
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
+ decreaseConcurrency(containerId);
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ }
+
+ if (taskAttemptId == null) {
+ //reassign next volume
+ volumeId = getLowestVolumeId();
+ increaseConcurrency(containerId, volumeId);
+ retry--;
+ } else {
+ break;
+ }
+ } while (retry > 0);
+ } else {
+ this.remainTasksNum.set(0);
+ }
+ return taskAttemptId;
+ }
+
+ public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) {
+ TaskAttemptId taskAttemptId = null;
+
+ if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
+ int retry = unassignedTaskForEachVolume.size();
+ do {
+ //clean and get a remaining task
+ int volumeId = getLowestVolumeId();
+ taskAttemptId = getAndRemove(volumeId);
+ if (taskAttemptId == null) {
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ retry--;
+ } else {
+ break;
+ }
+ } while (retry > 0);
+ }
+ return taskAttemptId;
+ }
+
+ private synchronized TaskAttemptId getAndRemove(int volumeId){
+ TaskAttemptId taskAttemptId = null;
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId;
+
+ LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+ if(list != null && list.size() > 0){
+ TaskAttempt taskAttempt;
+ synchronized (unassignedTaskForEachVolume) {
+ Iterator<TaskAttempt> iterator = list.iterator();
+ taskAttempt = iterator.next();
+ iterator.remove();
+ }
+
+ this.remainTasksNum.getAndDecrement();
+ taskAttemptId = taskAttempt.getId();
+ for (DataLocation location : taskAttempt.getTask().getDataLocations()) {
+ if (!this.getHost().equals(location.getHost())) {
+ HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
+ if (volumeMapping != null) {
+ volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt);
+ }
+ }
+ }
+ }
+
+ if(list == null || list.isEmpty()) {
+ unassignedTaskForEachVolume.remove(volumeId);
+ }
+ return taskAttemptId;
+ }
+
+ private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
+
+ LinkedHashSet<TaskAttempt> tasks = unassignedTaskForEachVolume.get(volumeId);
+
+ if(tasks != null && tasks.size() > 0){
+ tasks.remove(taskAttempt);
+ remainTasksNum.getAndDecrement();
+ } else {
+ unassignedTaskForEachVolume.remove(volumeId);
+ }
+ }
+
+ /**
+ * Increase the count of running tasks and disk loads for a certain task runner.
+ *
+ * @param containerId The task runner identifier
+ * @param volumeId Volume identifier
+ * @return the volume load (i.e., how many running tasks use this volume)
+ */
+ private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) {
+
+ int concurrency = 1;
+ if (diskVolumeLoads.containsKey(volumeId)) {
+ concurrency = diskVolumeLoads.get(volumeId) + 1;
+ }
+
+ if (volumeId > -1) {
+ LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
+ } else if (volumeId == -1) {
+ // this case is disabled namenode block meta or compressed text file or amazon s3
+ LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
+ } else if (volumeId == REMOTE) {
+ // this case has processed all block on host and it will be assigned to remote
+ LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
+ + ", Remote Concurrency : " + concurrency);
+ }
+ diskVolumeLoads.put(volumeId, concurrency);
+ lastAssignedVolumeId.put(containerId, volumeId);
+ return concurrency;
+ }
+
+ /**
+ * Decrease the count of running tasks of a certain task runner
+ */
+ private synchronized void decreaseConcurrency(TajoContainerId containerId){
+ Integer volumeId = lastAssignedVolumeId.get(containerId);
+ if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
+ Integer concurrency = diskVolumeLoads.get(volumeId);
+ if(concurrency > 0){
+ diskVolumeLoads.put(volumeId, concurrency - 1);
+ } else {
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ }
+ }
+ lastAssignedVolumeId.remove(containerId);
+ }
+
+ /**
+ * volume of a host : 0 ~ n
+ * compressed task, amazon s3, unKnown volume : -1
+ * remote task : -2
+ */
+ public int getLowestVolumeId(){
+ Map.Entry<Integer, Integer> volumeEntry = null;
+
+ for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
+ if(volumeEntry == null) volumeEntry = entry;
+
+ if (volumeEntry.getValue() >= entry.getValue()) {
+ volumeEntry = entry;
+ }
+ }
+
+ if(volumeEntry != null){
+ return volumeEntry.getKey();
+ } else {
+ return REMOTE;
+ }
+ }
+
+ public boolean isAssigned(TajoContainerId containerId){
+ return lastAssignedVolumeId.containsKey(containerId);
+ }
+
+ public boolean isRemote(TajoContainerId containerId){
+ Integer volumeId = lastAssignedVolumeId.get(containerId);
+ if(volumeId == null || volumeId > REMOTE){
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public int getRemoteConcurrency(){
+ return getVolumeConcurrency(REMOTE);
+ }
+
+ public int getVolumeConcurrency(int volumeId){
+ Integer size = diskVolumeLoads.get(volumeId);
+ if(size == null) return 0;
+ else return size;
+ }
+
+ public int getRemainingLocalTaskSize(){
+ return remainTasksNum.get();
+ }
+
+ public String getHost() {
+
+ return host;
+ }
+
+ public String getRack() {
+ return rack;
+ }
+ }
+
+ private class ScheduledRequests {
+ // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
+ // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
+ // if the task is not included in leafTasks and nonLeafTasks.
+ private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
+ private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
+ private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap();
+ private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
+
+ private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) {
+ TaskAttempt taskAttempt = event.getTaskAttempt();
+ List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
+
+ for (DataLocation location : locations) {
+ String host = location.getHost();
+
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+ if (hostVolumeMapping == null) {
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ hostVolumeMapping = new HostVolumeMapping(host, rack);
+ leafTaskHostMapping.put(host, hostVolumeMapping);
+ }
+ hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to host " + host);
+ }
+
+ HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
+ if (list == null) {
+ list = new HashSet<TaskAttemptId>();
+ leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
+ }
+
+ list.add(taskAttempt.getId());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
+ }
+ }
+
+ leafTasks.add(taskAttempt.getId());
+ }
+
+ private void addNonLeafTask(TaskAttemptToSchedulerEvent event) {
+ nonLeafTasks.add(event.getTaskAttempt().getId());
+ }
+
+ public int leafTaskNum() {
+ return leafTasks.size();
+ }
+
+ public int nonLeafTaskNum() {
+ return nonLeafTasks.size();
+ }
+
+ public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>();
+
+ private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+ if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
+ for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) {
+ TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+
+ if(attemptId == null) break;
+ //find remaining local task
+ if (leafTasks.contains(attemptId)) {
+ leafTasks.remove(attemptId);
+ //LOG.info(attemptId + " Assigned based on host match " + hostName);
+ hostLocalAssigned++;
+ totalAssigned++;
+ return attemptId;
+ }
+ }
+ }
+ return null;
+ }
+
+ private TaskAttemptId allocateRackTask(String host) {
+
+ List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values());
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ TaskAttemptId attemptId = null;
+
+ if (remainingTasks.size() > 0) {
+ synchronized (scheduledRequests) {
+ //find largest remaining task of other host in rack
+ Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+ @Override
+ public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+ // descending remaining tasks
+ if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) {
+ return 1;
+ } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ });
+ }
+
+ for (HostVolumeMapping tasks : remainingTasks) {
+ for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) {
+ TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack);
+
+ if (tId == null) break;
+
+ if (leafTasks.contains(tId)) {
+ leafTasks.remove(tId);
+ attemptId = tId;
+ break;
+ }
+ }
+ if(attemptId != null) break;
+ }
+ }
+
+ //find task in rack
+ if (attemptId == null) {
+ HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack);
+ if (list != null) {
+ synchronized (list) {
+ Iterator<TaskAttemptId> iterator = list.iterator();
+ while (iterator.hasNext()) {
+ TaskAttemptId tId = iterator.next();
+ iterator.remove();
+ if (leafTasks.contains(tId)) {
+ leafTasks.remove(tId);
+ attemptId = tId;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (attemptId != null) {
+ rackLocalAssigned++;
+ totalAssigned++;
+
+ LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
+ hostLocalAssigned, rackLocalAssigned, totalAssigned,
+ ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
+
+ }
+ return attemptId;
+ }
+
+ public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
+ LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
+
+ TaskRequestEvent taskRequest;
+ while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
+ taskRequest = taskRequests.pollFirst();
+ if(taskRequest == null) { // if there are only remote task requests
+ taskRequest = remoteTaskRequests.pollFirst();
+ }
+
+ // checking if this container is still alive.
+ // If not, ignore the task request and stop the task runner
+ ContainerProxy container = context.getMasterContext().getResourceAllocator()
+ .getContainer(taskRequest.getContainerId());
+ if(container == null) {
+ taskRequest.getCallback().run(stopTaskRunnerReq);
+ continue;
+ }
+
+ // getting the hostname of requested node
+ WorkerConnectionInfo connectionInfo =
+ context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
+ String host = connectionInfo.getHost();
+
+ // if there are no worker matched to the hostname a task request
+ if(!leafTaskHostMapping.containsKey(host)){
+ String normalizedHost = NetUtils.normalizeHost(host);
+
+ if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){
+ // this case means one of either cases:
+ // * there are no blocks which reside in this node.
+ // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
+ // In this case, we transfer the task request to the remote task request list, and skip the followings.
+ remoteTaskRequests.add(taskRequest);
+ continue;
+ }
+ }
+
+ TajoContainerId containerId = taskRequest.getContainerId();
+ LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+ "containerId=" + containerId);
+
+ //////////////////////////////////////////////////////////////////////
+ // disk or host-local allocation
+ //////////////////////////////////////////////////////////////////////
+ TaskAttemptId attemptId = allocateLocalTask(host, containerId);
+
+ if (attemptId == null) { // if a local task cannot be found
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+ if(hostVolumeMapping != null) {
+ if(!hostVolumeMapping.isRemote(containerId)){
+ // assign to remote volume
+ hostVolumeMapping.decreaseConcurrency(containerId);
+ hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
+ }
+ // this part is remote concurrency management of a tail tasks
+ int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
+
+ if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
+ //release container
+ hostVolumeMapping.decreaseConcurrency(containerId);
+ taskRequest.getCallback().run(stopTaskRunnerReq);
+ continue;
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////
+ // rack-local allocation
+ //////////////////////////////////////////////////////////////////////
+ attemptId = allocateRackTask(host);
+
+ //////////////////////////////////////////////////////////////////////
+ // random node allocation
+ //////////////////////////////////////////////////////////////////////
+ if (attemptId == null && leafTaskNum() > 0) {
+ synchronized (leafTasks){
+ attemptId = leafTasks.iterator().next();
+ leafTasks.remove(attemptId);
+ rackLocalAssigned++;
+ totalAssigned++;
+ LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
+ hostLocalAssigned, rackLocalAssigned, totalAssigned,
+ ((double) hostLocalAssigned / (double) totalAssigned) * 100));
+ }
+ }
+ }
+
+ if (attemptId != null) {
+ Task task = stage.getTask(attemptId.getTaskId());
+ TaskRequest taskAssign = new TaskRequestImpl(
+ attemptId,
+ new ArrayList<FragmentProto>(task.getAllFragments()),
+ "",
+ false,
+ LogicalNodeSerializer.serialize(task.getLogicalPlan()),
+ context.getMasterContext().getQueryContext(),
+ stage.getDataChannel(), stage.getBlock().getEnforcer());
+ if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
+ taskAssign.setInterQuery();
+ }
+
+ context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+ taskRequest.getContainerId(), connectionInfo));
+ assignedRequest.add(attemptId);
+
+ scheduledObjectNum--;
+ taskRequest.getCallback().run(taskAssign.getProto());
+ } else {
+ throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+ }
+ }
+ }
+
+ private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+ if (masterPlan.isRoot(block)) {
+ return false;
+ }
+
+ ExecutionBlock parent = masterPlan.getParent(block);
+ if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
+
+ TaskRequestEvent taskRequest;
+ while (!taskRequests.isEmpty()) {
+ taskRequest = taskRequests.pollFirst();
+ LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+ TaskAttemptId attemptId;
+ // random allocation
+ if (nonLeafTasks.size() > 0) {
+ synchronized (nonLeafTasks){
+ attemptId = nonLeafTasks.iterator().next();
+ nonLeafTasks.remove(attemptId);
+ }
+ LOG.debug("Assigned based on * match");
+
+ Task task;
+ task = stage.getTask(attemptId.getTaskId());
+ TaskRequest taskAssign = new TaskRequestImpl(
+ attemptId,
+ Lists.newArrayList(task.getAllFragments()),
+ "",
+ false,
+ LogicalNodeSerializer.serialize(task.getLogicalPlan()),
+ context.getMasterContext().getQueryContext(),
+ stage.getDataChannel(),
+ stage.getBlock().getEnforcer());
+ if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
+ taskAssign.setInterQuery();
+ }
+ for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
+ Collection<FetchImpl> fetches = entry.getValue();
+ if (fetches != null) {
+ for (FetchImpl fetch : fetches) {
+ taskAssign.addFetch(entry.getKey(), fetch);
+ }
+ }
+ }
+
+ WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator().
+ getWorkerConnectionInfo(taskRequest.getWorkerId());
+ context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+ taskRequest.getContainerId(), connectionInfo));
+ taskRequest.getCallback().run(taskAssign.getProto());
+ totalAssigned++;
+ scheduledObjectNum--;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
new file mode 100644
index 0000000..5fe2f80
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.util.List;
+import java.util.Map;
+
+public class FetchScheduleEvent extends TaskSchedulerEvent {
+ private final Map<String, List<FetchImpl>> fetches;
+
+ public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
+ final Map<String, List<FetchImpl>> fetches) {
+ super(eventType, blockId);
+ this.fetches = fetches;
+ }
+
+ public Map<String, List<FetchImpl>> getFetches() {
+ return fetches;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
new file mode 100644
index 0000000..2932694
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.history.QueryHistory;
+import org.apache.tajo.util.history.StageHistory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class Query implements EventHandler<QueryEvent> {
+ private static final Log LOG = LogFactory.getLog(Query.class);
+
+ // Facilities for Query
+ private final TajoConf systemConf;
+ private final Clock clock;
+ private String queryStr;
+ private Map<ExecutionBlockId, Stage> stages;
+ private final EventHandler eventHandler;
+ private final MasterPlan plan;
+ QueryMasterTask.QueryMasterTaskContext context;
+ private ExecutionBlockCursor cursor;
+
+ // Query Status
+ private final QueryId id;
+ private long appSubmitTime;
+ private long startTime;
+ private long finishTime;
+ private TableDesc resultDesc;
+ private int completedStagesCount = 0;
+ private int successedStagesCount = 0;
+ private int killedStagesCount = 0;
+ private int failedStagesCount = 0;
+ private int erroredStagesCount = 0;
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ // Internal Variables
+ private final Lock readLock;
+ private final Lock writeLock;
+ private int priority = 100;
+
+ // State Machine
+ private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+ private QueryState queryState;
+
+ // Transition Handler
+ private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
+ private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
+
+ protected static final StateMachineFactory
+ <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+ new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+ (QueryState.QUERY_NEW)
+
+ // Transitions from NEW state
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
+ QueryEventType.START,
+ new StartTransition())
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+ QueryEventType.KILL,
+ new KillNewQueryTransition())
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from RUNNING state
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+ QueryState.QUERY_ERROR),
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.KILL,
+ new KillAllStagesTransition())
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from QUERY_SUCCEEDED state
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ // ignore-able transitions
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.KILL)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from KILL_WAIT state
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+ QueryState.QUERY_ERROR),
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
+ QueryEventType.KILL,
+ QUERY_COMPLETED_TRANSITION)
+
+ // Transitions from FAILED state
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+ QueryEventType.KILL)
+
+ // Transitions from ERROR state
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED))
+
+ .installTopology();
+
+ public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
+ final long appSubmitTime,
+ final String queryStr,
+ final EventHandler eventHandler,
+ final MasterPlan plan) {
+ this.context = context;
+ this.systemConf = context.getConf();
+ this.id = id;
+ this.clock = context.getClock();
+ this.appSubmitTime = appSubmitTime;
+ this.queryStr = queryStr;
+ this.stages = Maps.newConcurrentMap();
+ this.eventHandler = eventHandler;
+ this.plan = plan;
+ this.cursor = new ExecutionBlockCursor(plan, true);
+
+ StringBuilder sb = new StringBuilder("\n=======================================================");
+ sb.append("\nThe order of execution: \n");
+ int order = 1;
+ while (cursor.hasNext()) {
+ ExecutionBlock currentEB = cursor.nextBlock();
+ sb.append("\n").append(order).append(": ").append(currentEB.getId());
+ order++;
+ }
+ sb.append("\n=======================================================");
+ LOG.info(sb);
+ cursor.reset();
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ stateMachine = stateMachineFactory.make(this);
+ queryState = stateMachine.getCurrentState();
+ }
+
+ public float getProgress() {
+ QueryState state = getState();
+ if (state == QueryState.QUERY_SUCCEEDED) {
+ return 1.0f;
+ } else {
+ int idx = 0;
+ List<Stage> tempStages = new ArrayList<Stage>();
+ synchronized(stages) {
+ tempStages.addAll(stages.values());
+ }
+
+ float [] subProgresses = new float[tempStages.size()];
+ for (Stage stage: tempStages) {
+ if (stage.getState() != StageState.NEW) {
+ subProgresses[idx] = stage.getProgress();
+ } else {
+ subProgresses[idx] = 0.0f;
+ }
+ idx++;
+ }
+
+ float totalProgress = 0.0f;
+ float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
+
+ for (int i = 0; i < subProgresses.length; i++) {
+ totalProgress += subProgresses[i] * proportion;
+ }
+
+ return totalProgress;
+ }
+ }
+
+ public long getAppSubmitTime() {
+ return this.appSubmitTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime() {
+ startTime = clock.getTime();
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime() {
+ finishTime = clock.getTime();
+ }
+
+ public QueryHistory getQueryHistory() {
+ QueryHistory queryHistory = makeQueryHistory();
+ queryHistory.setStageHistories(makeStageHistories());
+ return queryHistory;
+ }
+
+ private List<StageHistory> makeStageHistories() {
+ List<StageHistory> stageHistories = new ArrayList<StageHistory>();
+ for(Stage eachStage : getStages()) {
+ stageHistories.add(eachStage.getStageHistory());
+ }
+
+ return stageHistories;
+ }
+
+ private QueryHistory makeQueryHistory() {
+ QueryHistory queryHistory = new QueryHistory();
+
+ queryHistory.setQueryId(getId().toString());
+ queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
+ queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
+ queryHistory.setLogicalPlan(plan.toString());
+ queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
+ queryHistory.setDistributedPlan(plan.toString());
+
+ List<String[]> sessionVariables = new ArrayList<String[]>();
+ for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) {
+ if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) {
+ sessionVariables.add(new String[]{entry.getKey(), entry.getValue()});
+ }
+ }
+ queryHistory.setSessionVariables(sessionVariables);
+
+ return queryHistory;
+ }
+
+ public List<String> getDiagnostics() {
+ readLock.lock();
+ try {
+ return diagnostics;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected void addDiagnostic(String diag) {
+ diagnostics.add(diag);
+ }
+
+ public TableDesc getResultDesc() {
+ return resultDesc;
+ }
+
+ public void setResultDesc(TableDesc desc) {
+ resultDesc = desc;
+ }
+
+ public MasterPlan getPlan() {
+ return plan;
+ }
+
+ public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+ return stateMachine;
+ }
+
+ public void addStage(Stage stage) {
+ stages.put(stage.getId(), stage);
+ }
+
+ public QueryId getId() {
+ return this.id;
+ }
+
+ public Stage getStage(ExecutionBlockId id) {
+ return this.stages.get(id);
+ }
+
+ public Collection<Stage> getStages() {
+ return this.stages.values();
+ }
+
+ public QueryState getSynchronizedState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /* non-blocking call for client API */
+ public QueryState getState() {
+ return queryState;
+ }
+
+ public ExecutionBlockCursor getExecutionBlockCursor() {
+ return cursor;
+ }
+
+ public static class StartTransition
+ implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent queryEvent) {
+
+ query.setStartTime();
+ Stage stage = new Stage(query.context, query.getPlan(),
+ query.getExecutionBlockCursor().nextBlock());
+ stage.setPriority(query.priority--);
+ query.addStage(stage);
+
+ stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+ LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+ }
+ }
+
+ public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+ @Override
+ public QueryState transition(Query query, QueryEvent queryEvent) {
+ QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent;
+ QueryState finalState;
+
+ if (stageEvent.getState() == StageState.SUCCEEDED) {
+ finalState = finalizeQuery(query, stageEvent);
+ } else if (stageEvent.getState() == StageState.FAILED) {
+ finalState = QueryState.QUERY_FAILED;
+ } else if (stageEvent.getState() == StageState.KILLED) {
+ finalState = QueryState.QUERY_KILLED;
+ } else {
+ finalState = QueryState.QUERY_ERROR;
+ }
+ if (finalState != QueryState.QUERY_SUCCEEDED) {
+ Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
+ if (lastStage != null && lastStage.getTableMeta() != null) {
+ StoreType storeType = lastStage.getTableMeta().getStoreType();
+ if (storeType != null) {
+ LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+ try {
+ StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
+ } catch (IOException e) {
+ LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ query.setFinishTime();
+
+ return finalState;
+ }
+
+ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
+ Stage lastStage = query.getStage(event.getExecutionBlockId());
+ StoreType storeType = lastStage.getTableMeta().getStoreType();
+ try {
+ LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+ CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+
+ Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
+ .commitOutputData(query.context.getQueryContext(),
+ lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
+
+ QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+ hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
+ } catch (Exception e) {
+ query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+ return QueryState.QUERY_ERROR;
+ }
+
+ return QueryState.QUERY_SUCCEEDED;
+ }
+
+ private static interface QueryHook {
+ boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
+ void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
+ ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
+ }
+
+ private class QueryHookExecutor {
+ private List<QueryHook> hookList = TUtil.newList();
+ private QueryMaster.QueryMasterContext context;
+
+ public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
+ this.context = context;
+ hookList.add(new MaterializedResultHook());
+ hookList.add(new CreateTableHook());
+ hookList.add(new InsertTableHook());
+ }
+
+ public void execute(QueryContext queryContext, Query query,
+ ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) throws Exception {
+ for (QueryHook hook : hookList) {
+ if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
+ hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
+ }
+ }
+ }
+ }
+
+ private class MaterializedResultHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ NodeType type = lastStage.getBlock().getPlan().getType();
+ return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) throws Exception {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ TableMeta meta = lastStage.getTableMeta();
+
+ String nullChar = queryContext.get(SessionVars.NULL_CHAR);
+ meta.putOption(StorageConstants.TEXT_NULL, nullChar);
+
+ TableStats stats = lastStage.getResultStats();
+
+ TableDesc resultTableDesc =
+ new TableDesc(
+ query.getId().toString(),
+ lastStage.getSchema(),
+ meta,
+ finalOutputDir.toUri());
+ resultTableDesc.setExternal(true);
+
+ stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+ resultTableDesc.setStats(stats);
+ query.setResultDesc(resultTableDesc);
+ }
+ }
+
+ private class CreateTableHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ Stage lastStage = query.getStage(finalExecBlockId);
+ TableStats stats = lastStage.getResultStats();
+
+ CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
+ TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
+
+ TableDesc tableDescTobeCreated =
+ new TableDesc(
+ createTableNode.getTableName(),
+ createTableNode.getTableSchema(),
+ meta,
+ finalOutputDir.toUri());
+ tableDescTobeCreated.setExternal(createTableNode.isExternal());
+
+ if (createTableNode.hasPartition()) {
+ tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
+ }
+
+ stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+ tableDescTobeCreated.setStats(stats);
+ query.setResultDesc(tableDescTobeCreated);
+
+ catalog.createTable(tableDescTobeCreated);
+ }
+ }
+
+ private class InsertTableHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
+ throws Exception {
+
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ Stage lastStage = query.getStage(finalExecBlockId);
+ TableMeta meta = lastStage.getTableMeta();
+ TableStats stats = lastStage.getResultStats();
+
+ InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
+
+ TableDesc finalTable;
+ if (insertNode.hasTargetTable()) {
+ String tableName = insertNode.getTableName();
+ finalTable = catalog.getTableDesc(tableName);
+ } else {
+ String tableName = query.getId().toString();
+ finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri());
+ }
+
+ long volume = getTableVolume(query.systemConf, finalOutputDir);
+ stats.setNumBytes(volume);
+ finalTable.setStats(stats);
+
+ if (insertNode.hasTargetTable()) {
+ UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
+ builder.setTableName(finalTable.getName());
+ builder.setStats(stats.getProto());
+
+ catalog.updateTableStats(builder.build());
+ }
+
+ query.setResultDesc(finalTable);
+ }
+ }
+ }
+
+ public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(systemConf);
+ ContentSummary directorySummary = fs.getContentSummary(tablePath);
+ return directorySummary.getLength();
+ }
+
+ public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ private boolean hasNext(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.peek();
+ return !query.getPlan().isTerminal(nextBlock);
+ }
+
+ private void executeNextBlock(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.nextBlock();
+ Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
+ nextStage.setPriority(query.priority--);
+ query.addStage(nextStage);
+ nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+
+ LOG.info("Scheduling Stage:" + nextStage.getId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
+ LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+ }
+ }
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ try {
+ query.completedStagesCount++;
+ StageCompletedEvent castEvent = (StageCompletedEvent) event;
+
+ if (castEvent.getState() == StageState.SUCCEEDED) {
+ query.successedStagesCount++;
+ } else if (castEvent.getState() == StageState.KILLED) {
+ query.killedStagesCount++;
+ } else if (castEvent.getState() == StageState.FAILED) {
+ query.failedStagesCount++;
+ } else if (castEvent.getState() == StageState.ERROR) {
+ query.erroredStagesCount++;
+ } else {
+ LOG.error(String.format("Invalid Stage (%s) State %s at %s",
+ castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name()));
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+
+ // if a stage is succeeded and a query is running
+ if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded
+ query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
+ hasNext(query)) { // there remains at least one stage.
+ query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
+ executeNextBlock(query);
+ } else { // if a query is completed due to finished, kill, failure, or error
+ query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+ }
+ }
+
+ private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+ }
+ }
+
+ private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.setFinishTime();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ }
+ }
+
+ private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ synchronized (query.stages) {
+ for (Stage stage : query.stages.values()) {
+ query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+ }
+ }
+ }
+ }
+
+ private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.setFinishTime();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ }
+ }
+
+ @Override
+ public void handle(QueryEvent event) {
+ LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+ try {
+ writeLock.lock();
+ QueryState oldState = getSynchronizedState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ queryState = getSynchronizedState();
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state"
+ + ", type:" + event
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getSynchronizedState().name()
+ , e);
+ eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
+ }
+
+ //notify the eventhandler of state change
+ if (oldState != getSynchronizedState()) {
+ LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState());
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
new file mode 100644
index 0000000..bda2ec1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+
+public class QueryInProgress extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+ private QueryId queryId;
+
+ private Session session;
+
+ private AsyncDispatcher dispatcher;
+
+ private LogicalRootNode plan;
+
+ private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private QueryInfo queryInfo;
+
+ private final TajoMaster.MasterContext masterContext;
+
+ private NettyClientBase queryMasterRpc;
+
+ private QueryMasterProtocolService queryMasterRpcClient;
+
+ public QueryInProgress(
+ TajoMaster.MasterContext masterContext,
+ Session session,
+ QueryContext queryContext,
+ QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
+ super(QueryInProgress.class.getName());
+ this.masterContext = masterContext;
+ this.session = session;
+ this.queryId = queryId;
+ this.plan = plan;
+
+ queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
+ queryInfo.setStartTime(System.currentTimeMillis());
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ dispatcher = new AsyncDispatcher();
+ this.addService(dispatcher);
+
+ dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
+ super.init(conf);
+ }
+
+ public synchronized void kill() {
+ if(queryMasterRpcClient != null){
+ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if(stopped.getAndSet(true)) {
+ return;
+ }
+
+ LOG.info("=========================================================");
+ LOG.info("Stop query:" + queryId);
+
+ masterContext.getResourceManager().stopQueryMaster(queryId);
+
+ long startTime = System.currentTimeMillis();
+ while(true) {
+ try {
+ if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+ LOG.info(queryId + " QueryMaster stopped");
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ break;
+ }
+
+ try {
+ synchronized (this){
+ wait(100);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ if(System.currentTimeMillis() - startTime > 60 * 1000) {
+ LOG.warn("Failed to stop QueryMaster:" + queryId);
+ break;
+ }
+ }
+
+ if(queryMasterRpc != null) {
+ RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+ }
+
+ masterContext.getHistoryWriter().appendHistory(queryInfo);
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+
+
+ public boolean startQueryMaster() {
+ try {
+ LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+ WorkerResourceManager resourceManager = masterContext.getResourceManager();
+ WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+ // if no resource to allocate a query master
+ if(resource == null) {
+ LOG.info("No Available Resources for QueryMaster");
+ return false;
+ }
+
+ queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+ queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+ queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+
+ getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+
+ return true;
+ } catch (Exception e) {
+ catchException(e);
+ return false;
+ }
+ }
+
+ class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
+ @Override
+ public void handle(QueryJobEvent queryJobEvent) {
+ if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+ heartbeat(queryJobEvent.getQueryInfo());
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+ QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+ queryInProgress.getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
+ submmitQueryToMaster();
+ } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+ kill();
+ }
+ }
+ }
+
+ private void connectQueryMaster() throws Exception {
+ InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+ LOG.info("Connect to QueryMaster:" + addr);
+ queryMasterRpc =
+ RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+ queryMasterRpcClient = queryMasterRpc.getStub();
+ }
+
+ private synchronized void submmitQueryToMaster() {
+ if(querySubmitted.get()) {
+ return;
+ }
+
+ try {
+ if(queryMasterRpcClient == null) {
+ connectQueryMaster();
+ }
+ if(queryMasterRpcClient == null) {
+ LOG.info("No QueryMaster conneciton info.");
+ //TODO wait
+ return;
+ }
+ LOG.info("Call executeQuery to :" +
+ queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+
+ QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+ builder.setQueryId(queryId.getProto())
+ .setQueryContext(queryInfo.getQueryContext().getProto())
+ .setSession(session.getProto())
+ .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
+ .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+
+ queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+ querySubmitted.set(true);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public void catchException(Exception e) {
+ LOG.error(e.getMessage(), e);
+ queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+ queryInfo.setLastMessage(StringUtils.stringifyException(e));
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ public boolean isStarted() {
+ return !stopped.get() && this.querySubmitted.get();
+ }
+
+ private void heartbeat(QueryInfo queryInfo) {
+ LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+
+ // to avoid partial update by different heartbeats
+ synchronized (this.queryInfo) {
+
+ // terminal state will let client to retrieve a query result
+ // So, we must set the query result before changing query state
+ if (isFinishState(queryInfo.getQueryState())) {
+ if (queryInfo.hasResultdesc()) {
+ this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+ }
+ }
+
+ this.queryInfo.setQueryState(queryInfo.getQueryState());
+ this.queryInfo.setProgress(queryInfo.getProgress());
+ this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+ // Update diagnosis message
+ if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+ this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+ LOG.info(queryId + queryInfo.getLastMessage());
+ }
+
+ // if any error occurs, print outs the error message
+ if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+ LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+ }
+
+
+ if (isFinishState(this.queryInfo.getQueryState())) {
+ masterContext.getQueryJobManager().getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+ }
+ }
+ }
+
+ private boolean isFinishState(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_FAILED ||
+ state == TajoProtos.QueryState.QUERY_KILLED ||
+ state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
new file mode 100644
index 0000000..1a1f2ff
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.master.QueryInfo;
+
+public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
+ private QueryInfo queryInfo;
+
+ public QueryJobEvent(Type type, QueryInfo queryInfo) {
+ super(type);
+
+ this.queryInfo = queryInfo;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ public enum Type {
+ QUERY_JOB_START,
+ QUERY_JOB_HEARTBEAT,
+ QUERY_JOB_FINISH,
+ QUERY_JOB_STOP,
+ QUERY_MASTER_START,
+ QUERY_MASTER_STOP,
+ QUERY_JOB_KILL
+ }
+}