You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:50 UTC
[28/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
new file mode 100644
index 0000000..409a1b1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -0,0 +1,896 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class DefaultTaskScheduler extends AbstractTaskScheduler {
+ private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
+
+ private final TaskSchedulerContext context;
+ private SubQuery subQuery;
+
+ private Thread schedulingThread;
+ private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
+
+ private ScheduledRequests scheduledRequests;
+ private TaskRequests taskRequests;
+
+ private int nextTaskId = 0;
+ private int scheduledObjectNum = 0;
+
+ public DefaultTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
+ super(DefaultTaskScheduler.class.getName());
+ this.context = context;
+ this.subQuery = subQuery;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+
+ scheduledRequests = new ScheduledRequests();
+ taskRequests = new TaskRequests();
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Start TaskScheduler");
+
+ this.schedulingThread = new Thread() {
+ public void run() {
+
+ while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ synchronized (schedulingThread){
+ schedulingThread.wait(100);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ schedule();
+ }
+ LOG.info("TaskScheduler schedulingThread stopped");
+ }
+ };
+
+ this.schedulingThread.start();
+ super.start();
+ }
+
+ private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+ public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+ static {
+ ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+ NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+
+ TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+ TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+ builder.setId(NULL_ATTEMPT_ID.getProto());
+ builder.setShouldDie(true);
+ builder.setOutputTable("");
+ builder.setSerializedData("");
+ builder.setClusteredOutput(false);
+ stopTaskRunnerReq = builder.build();
+ }
+
+ @Override
+ public void stop() {
+ if(stopEventHandling.getAndSet(true)){
+ return;
+ }
+
+ if (schedulingThread != null) {
+ synchronized (schedulingThread) {
+ schedulingThread.notifyAll();
+ }
+ }
+
+ // Return all of request callbacks instantly.
+ for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+ req.getCallback().run(stopTaskRunnerReq);
+ }
+
+ LOG.info("Task Scheduler stopped");
+ super.stop();
+ }
+
+ private FileFragment[] fragmentsForNonLeafTask;
+
+ LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
+ public void schedule() {
+
+ if (taskRequests.size() > 0) {
+ if (scheduledRequests.leafTaskNum() > 0) {
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+ taskRequests.size() + ", LeafTask Schedule Request: " +
+ scheduledRequests.leafTaskNum());
+ taskRequests.getTaskRequests(taskRequestEvents,
+ scheduledRequests.leafTaskNum());
+ LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+ if (taskRequestEvents.size() > 0) {
+ scheduledRequests.assignToLeafTasks(taskRequestEvents);
+ taskRequestEvents.clear();
+ }
+ }
+ }
+
+ if (taskRequests.size() > 0) {
+ if (scheduledRequests.nonLeafTaskNum() > 0) {
+ LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+ taskRequests.size() + ", NonLeafTask Schedule Request: " +
+ scheduledRequests.nonLeafTaskNum());
+ taskRequests.getTaskRequests(taskRequestEvents,
+ scheduledRequests.nonLeafTaskNum());
+ scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
+ taskRequestEvents.clear();
+ }
+ }
+ }
+
+ @Override
+ public void handle(TaskSchedulerEvent event) {
+ if (event.getType() == EventType.T_SCHEDULE) {
+ if (event instanceof FragmentScheduleEvent) {
+ FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+ if (context.isLeafQuery()) {
+ QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+ QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+ task.addFragment(castEvent.getLeftFragment(), true);
+ scheduledObjectNum++;
+ if (castEvent.hasRightFragments()) {
+ task.addFragments(castEvent.getRightFragments());
+ //scheduledObjectNum += castEvent.getRightFragments().size();
+ }
+ subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ } else {
+ fragmentsForNonLeafTask = new FileFragment[2];
+ fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
+ if (castEvent.hasRightFragments()) {
+ fragmentsForNonLeafTask[1] = castEvent.getRightFragments().toArray(new FileFragment[]{})[0];
+ }
+ }
+ } else if (event instanceof FetchScheduleEvent) {
+ FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+ Map<String, List<URI>> fetches = castEvent.getFetches();
+ QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
+ QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+ scheduledObjectNum++;
+ for (Entry<String, List<URI>> eachFetch : fetches.entrySet()) {
+ task.addFetches(eachFetch.getKey(), eachFetch.getValue());
+ task.addFragment(fragmentsForNonLeafTask[0], true);
+ if (fragmentsForNonLeafTask[1] != null) {
+ task.addFragment(fragmentsForNonLeafTask[1], true);
+ }
+ }
+ subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+ } else if (event instanceof QueryUnitAttemptScheduleEvent) {
+ QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+ if (context.isLeafQuery()) {
+ scheduledRequests.addLeafTask(castEvent);
+ } else {
+ scheduledRequests.addNonLeafTask(castEvent);
+ }
+ }
+ } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
+ // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler.
+ // This event is triggered by QueryUnitAttempt.
+ QueryUnitAttemptScheduleEvent castedEvent = (QueryUnitAttemptScheduleEvent) event;
+ scheduledRequests.leafTasks.remove(castedEvent.getQueryUnitAttempt().getId());
+ LOG.info(castedEvent.getQueryUnitAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+ ((QueryUnitAttemptScheduleEvent) event).getQueryUnitAttempt().handle(
+ new TaskAttemptEvent(castedEvent.getQueryUnitAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
+ }
+ }
+
+ @Override
+ public void handleTaskRequestEvent(TaskRequestEvent event) {
+
+ taskRequests.handle(event);
+ int hosts = scheduledRequests.leafTaskHostMapping.size();
+
+ // if available cluster resource are large then tasks, the scheduler thread are working immediately.
+ if(remainingScheduledObjectNum() > 0 &&
+ (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
+ synchronized (schedulingThread){
+ schedulingThread.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public int remainingScheduledObjectNum() {
+ return scheduledObjectNum;
+ }
+
+ private class TaskRequests implements EventHandler<TaskRequestEvent> {
+ private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+ new LinkedBlockingQueue<TaskRequestEvent>();
+
+ @Override
+ public void handle(TaskRequestEvent event) {
+ if(LOG.isDebugEnabled()){
+ LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+ }
+
+ if(stopEventHandling.get()) {
+ event.getCallback().run(stopTaskRunnerReq);
+ return;
+ }
+ int qSize = taskRequestQueue.size();
+ if (qSize != 0 && qSize % 1000 == 0) {
+ LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+ }
+ int remCapacity = taskRequestQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue "
+ + "of YarnRMContainerAllocator: " + remCapacity);
+ }
+
+ taskRequestQueue.add(event);
+ }
+
+ public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+ int num) {
+ taskRequestQueue.drainTo(taskRequests, num);
+ }
+
+ public int size() {
+ return taskRequestQueue.size();
+ }
+ }
+
+ /**
+ * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
+ * describes various information for one worker, including :
+ * <ul>
+ * <li>host name</li>
+ * <li>rack name</li>
+ * <li>unassigned tasks for each disk volume</li>
+ * <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
+ * <li>the number of running tasks for each volume</li>
+ * </ul>, each task runner and the concurrency number of running tasks for volumes.
+ *
+ * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
+ * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
+ * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
+ *
+ * <h3>Volume id</h3>
+ * Volume id is an integer. Each volume id identifies each disk volume.
+ *
+ * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}. *
+ * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
+ * In this case, the volume id will be -1 or other native integer.
+ *
+ * <h3>See Also</h3>
+ * <ul>
+ * <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
+ * </ul>
+ */
+ public class HostVolumeMapping {
+ private final String host;
+ private final String rack;
+ /** A key is disk volume, and a value is a list of tasks to be scheduled. */
+ private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume =
+ Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>());
+ /** A value is last assigned volume id for each task runner */
+ private HashMap<ContainerId, Integer> lastAssignedVolumeId = new HashMap<ContainerId, Integer>();
+ /**
+ * A key is disk volume id, and a value is the load of this volume.
+ * This load is measured by counting how many number of tasks are running.
+ *
+ * These disk volumes are kept in an order of ascending order of the volume id.
+ * In other words, the head volume ids are likely to -1, meaning no given volume id.
+ */
+ private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
+ /** The total number of remain tasks in this host */
+ private AtomicInteger remainTasksNum = new AtomicInteger(0);
+ public static final int REMOTE = -2;
+
+
+ public HostVolumeMapping(String host, String rack){
+ this.host = host;
+ this.rack = rack;
+ }
+
+ public synchronized void addQueryUnitAttempt(int volumeId, QueryUnitAttempt attemptId){
+ synchronized (unassignedTaskForEachVolume){
+ LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+ if (list == null) {
+ list = new LinkedHashSet<QueryUnitAttempt>();
+ unassignedTaskForEachVolume.put(volumeId, list);
+ }
+ list.add(attemptId);
+ }
+
+ remainTasksNum.incrementAndGet();
+
+ if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
+ }
+
+ /**
+ * Priorities
+ * 1. a task list in a volume of host
+ * 2. unknown block or Non-splittable task in host
+ * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
+ */
+ public synchronized QueryUnitAttemptId getLocalTask(ContainerId containerId) {
+ int volumeId;
+ QueryUnitAttemptId queryUnitAttemptId = null;
+
+ if (!lastAssignedVolumeId.containsKey(containerId)) {
+ volumeId = getLowestVolumeId();
+ increaseConcurrency(containerId, volumeId);
+ } else {
+ volumeId = lastAssignedVolumeId.get(containerId);
+ }
+
+ if (unassignedTaskForEachVolume.size() > 0) {
+ int retry = unassignedTaskForEachVolume.size();
+ do {
+ //clean and get a remaining local task
+ queryUnitAttemptId = getAndRemove(volumeId);
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
+ decreaseConcurrency(containerId);
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ }
+
+ if (queryUnitAttemptId == null) {
+ //reassign next volume
+ volumeId = getLowestVolumeId();
+ increaseConcurrency(containerId, volumeId);
+ retry--;
+ } else {
+ break;
+ }
+ } while (retry > 0);
+ } else {
+ this.remainTasksNum.set(0);
+ }
+ return queryUnitAttemptId;
+ }
+
+ public synchronized QueryUnitAttemptId getQueryUnitAttemptIdByRack(String rack) {
+ QueryUnitAttemptId queryUnitAttemptId = null;
+
+ if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
+ int retry = unassignedTaskForEachVolume.size();
+ do {
+ //clean and get a remaining task
+ int volumeId = getLowestVolumeId();
+ queryUnitAttemptId = getAndRemove(volumeId);
+ if (queryUnitAttemptId == null) {
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ retry--;
+ } else {
+ break;
+ }
+ } while (retry > 0);
+ }
+ return queryUnitAttemptId;
+ }
+
+ private synchronized QueryUnitAttemptId getAndRemove(int volumeId){
+ QueryUnitAttemptId queryUnitAttemptId = null;
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) return queryUnitAttemptId;
+
+ LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+ if(list != null && list.size() > 0){
+ QueryUnitAttempt queryUnitAttempt;
+ synchronized (unassignedTaskForEachVolume) {
+ Iterator<QueryUnitAttempt> iterator = list.iterator();
+ queryUnitAttempt = iterator.next();
+ iterator.remove();
+ }
+
+ this.remainTasksNum.getAndDecrement();
+ queryUnitAttemptId = queryUnitAttempt.getId();
+ for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) {
+ if (!this.getHost().equals(location.getHost())) {
+ HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
+ volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+ }
+ }
+ }
+
+ if(list == null || list.isEmpty()) {
+ unassignedTaskForEachVolume.remove(volumeId);
+ }
+ return queryUnitAttemptId;
+ }
+
+ private synchronized void removeQueryUnitAttempt(int volumeId, QueryUnitAttempt queryUnitAttempt){
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
+
+ LinkedHashSet<QueryUnitAttempt> tasks = unassignedTaskForEachVolume.get(volumeId);
+
+ if(tasks != null && tasks.size() > 0){
+ tasks.remove(queryUnitAttempt);
+ remainTasksNum.getAndDecrement();
+ } else {
+ unassignedTaskForEachVolume.remove(volumeId);
+ }
+ }
+
+ /**
+ * Increase the count of running tasks and disk loads for a certain task runner.
+ *
+ * @param containerId The task runner identifier
+ * @param volumeId Volume identifier
+ * @return the volume load (i.e., how many running tasks use this volume)
+ */
+ private synchronized int increaseConcurrency(ContainerId containerId, int volumeId) {
+
+ int concurrency = 1;
+ if (diskVolumeLoads.containsKey(volumeId)) {
+ concurrency = diskVolumeLoads.get(volumeId) + 1;
+ }
+
+ if (volumeId > -1) {
+ LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
+ } else if (volumeId == -1) {
+ // this case is disabled namenode block meta or compressed text file or amazon s3
+ LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
+ } else if (volumeId == REMOTE) {
+ // this case has processed all block on host and it will be assigned to remote
+ LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
+ + ", Remote Concurrency : " + concurrency);
+ }
+ diskVolumeLoads.put(volumeId, concurrency);
+ lastAssignedVolumeId.put(containerId, volumeId);
+ return concurrency;
+ }
+
+ /**
+ * Decrease the count of running tasks of a certain task runner
+ */
+ private synchronized void decreaseConcurrency(ContainerId containerId){
+ Integer volumeId = lastAssignedVolumeId.get(containerId);
+ if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
+ Integer concurrency = diskVolumeLoads.get(volumeId);
+ if(concurrency > 0){
+ diskVolumeLoads.put(volumeId, concurrency - 1);
+ } else {
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ }
+ }
+ lastAssignedVolumeId.remove(containerId);
+ }
+
+ /**
+ * volume of a host : 0 ~ n
+ * compressed task, amazon s3, unKnown volume : -1
+ * remote task : -2
+ */
+ public int getLowestVolumeId(){
+ Map.Entry<Integer, Integer> volumeEntry = null;
+
+ for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
+ if(volumeEntry == null) volumeEntry = entry;
+
+ if (volumeEntry.getValue() >= entry.getValue()) {
+ volumeEntry = entry;
+ }
+ }
+
+ if(volumeEntry != null){
+ return volumeEntry.getKey();
+ } else {
+ return REMOTE;
+ }
+ }
+
+ public boolean isAssigned(ContainerId containerId){
+ return lastAssignedVolumeId.containsKey(containerId);
+ }
+
+ public boolean isRemote(ContainerId containerId){
+ Integer volumeId = lastAssignedVolumeId.get(containerId);
+ if(volumeId == null || volumeId > REMOTE){
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public int getRemoteConcurrency(){
+ return getVolumeConcurrency(REMOTE);
+ }
+
+ public int getVolumeConcurrency(int volumeId){
+ Integer size = diskVolumeLoads.get(volumeId);
+ if(size == null) return 0;
+ else return size;
+ }
+
+ public int getRemainingLocalTaskSize(){
+ return remainTasksNum.get();
+ }
+
+ public String getHost() {
+
+ return host;
+ }
+
+ public String getRack() {
+ return rack;
+ }
+ }
+
+ private class ScheduledRequests {
+ // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
+ // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
+ // if the task is not included in leafTasks and nonLeafTasks.
+ private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+ private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+ private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>();
+ private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping =
+ new HashMap<String, HashSet<QueryUnitAttemptId>>();
+
+ private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
+ QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
+ List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
+
+ for (DataLocation location : locations) {
+ String host = location.getHost();
+
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+ if (hostVolumeMapping == null) {
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ hostVolumeMapping = new HostVolumeMapping(host, rack);
+ leafTaskHostMapping.put(host, hostVolumeMapping);
+ }
+ hostVolumeMapping.addQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to host " + host);
+ }
+
+ HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
+ if (list == null) {
+ list = new HashSet<QueryUnitAttemptId>();
+ leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
+ }
+
+ list.add(queryUnitAttempt.getId());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
+ }
+ }
+
+ leafTasks.add(queryUnitAttempt.getId());
+ }
+
+ private void addNonLeafTask(QueryUnitAttemptScheduleEvent event) {
+ nonLeafTasks.add(event.getQueryUnitAttempt().getId());
+ }
+
+ public int leafTaskNum() {
+ return leafTasks.size();
+ }
+
+ public int nonLeafTaskNum() {
+ return nonLeafTasks.size();
+ }
+
+ public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
+
+ private QueryUnitAttemptId allocateLocalTask(String host, ContainerId containerId){
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+ if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
+ while (hostVolumeMapping.getRemainingLocalTaskSize() > 0) {
+ QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+ //find remaining local task
+ if (leafTasks.contains(attemptId)) {
+ leafTasks.remove(attemptId);
+ //LOG.info(attemptId + " Assigned based on host match " + hostName);
+ hostLocalAssigned++;
+ totalAssigned++;
+ return attemptId;
+ }
+ }
+ }
+ return null;
+ }
+
+ private QueryUnitAttemptId allocateRackTask(String host) {
+
+ List<HostVolumeMapping> remainingTasks = new ArrayList<HostVolumeMapping>(leafTaskHostMapping.values());
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ QueryUnitAttemptId attemptId = null;
+
+ if (remainingTasks.size() > 0) {
+ //find largest remaining task of other host in rack
+ Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+ @Override
+ public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+ // descending remaining tasks
+ return Integer.valueOf(v2.remainTasksNum.get()).compareTo(Integer.valueOf(v1.remainTasksNum.get()));
+ }
+ });
+
+ for (HostVolumeMapping tasks : remainingTasks) {
+ while (tasks.getRemainingLocalTaskSize() > 0){
+ QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack);
+
+ if (tId == null) break;
+
+ if (leafTasks.contains(tId)) {
+ leafTasks.remove(tId);
+ attemptId = tId;
+ break;
+ }
+ }
+ if(attemptId != null) break;
+ }
+ }
+
+ //find task in rack
+ if (attemptId == null) {
+ HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+ if (list != null) {
+ synchronized (list) {
+ Iterator<QueryUnitAttemptId> iterator = list.iterator();
+ while (iterator.hasNext()) {
+ QueryUnitAttemptId tId = iterator.next();
+ iterator.remove();
+ if (leafTasks.contains(tId)) {
+ leafTasks.remove(tId);
+ attemptId = tId;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (attemptId != null) {
+ rackLocalAssigned++;
+ totalAssigned++;
+
+ LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
+ hostLocalAssigned, rackLocalAssigned, totalAssigned,
+ ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
+
+ }
+ return attemptId;
+ }
+
+ public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
+ LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
+
+ TaskRequestEvent taskRequest;
+ while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
+ taskRequest = taskRequests.pollFirst();
+ if(taskRequest == null) { // if there are only remote task requests
+ taskRequest = remoteTaskRequests.pollFirst();
+ }
+
+ // checking if this container is still alive.
+ // If not, ignore the task request and stop the task runner
+ ContainerProxy container = context.getMasterContext().getResourceAllocator()
+ .getContainer(taskRequest.getContainerId());
+ if(container == null) {
+ taskRequest.getCallback().run(stopTaskRunnerReq);
+ continue;
+ }
+
+ // getting the hostname of requested node
+ String host = container.getTaskHostName();
+
+ // if there are no worker matched to the hostname a task request
+ if(!leafTaskHostMapping.containsKey(host)){
+ host = NetUtils.normalizeHost(host);
+
+ if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){
+ // this case means one of either cases:
+ // * there are no blocks which reside in this node.
+ // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
+ // In this case, we transfer the task request to the remote task request list, and skip the followings.
+ remoteTaskRequests.add(taskRequest);
+ continue;
+ }
+ }
+
+ ContainerId containerId = taskRequest.getContainerId();
+ LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+ "containerId=" + containerId);
+
+ //////////////////////////////////////////////////////////////////////
+ // disk or host-local allocation
+ //////////////////////////////////////////////////////////////////////
+ QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);
+
+ if (attemptId == null) { // if a local task cannot be found
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+ if(hostVolumeMapping != null) {
+ if(!hostVolumeMapping.isRemote(containerId)){
+ // assign to remote volume
+ hostVolumeMapping.decreaseConcurrency(containerId);
+ hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
+ }
+ // this part is remote concurrency management of a tail tasks
+ int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
+
+ if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
+ //release container
+ hostVolumeMapping.decreaseConcurrency(containerId);
+ taskRequest.getCallback().run(stopTaskRunnerReq);
+ subQuery.releaseContainer(containerId);
+ continue;
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////
+ // rack-local allocation
+ //////////////////////////////////////////////////////////////////////
+ attemptId = allocateRackTask(host);
+
+ //////////////////////////////////////////////////////////////////////
+ // random node allocation
+ //////////////////////////////////////////////////////////////////////
+ if (attemptId == null && leafTaskNum() > 0) {
+ synchronized (leafTasks){
+ attemptId = leafTasks.iterator().next();
+ leafTasks.remove(attemptId);
+ rackLocalAssigned++;
+ totalAssigned++;
+ LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
+ hostLocalAssigned, rackLocalAssigned, totalAssigned,
+ ((double) hostLocalAssigned / (double) totalAssigned) * 100));
+ }
+ }
+ }
+
+ if (attemptId != null) {
+ QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+ QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+ attemptId,
+ new ArrayList<FragmentProto>(task.getAllFragments()),
+ "",
+ false,
+ task.getLogicalPlan().toJson(),
+ context.getMasterContext().getQueryContext(),
+ subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
+ if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+ taskAssign.setInterQuery();
+ }
+
+ context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+ taskRequest.getContainerId(),
+ host, container.getTaskPort()));
+ assignedRequest.add(attemptId);
+
+ scheduledObjectNum -= task.getAllFragments().size();
+ taskRequest.getCallback().run(taskAssign.getProto());
+ } else {
+ throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+ }
+ }
+ }
+
+ private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+ if (masterPlan.isRoot(block)) {
+ return false;
+ }
+
+ ExecutionBlock parent = masterPlan.getParent(block);
+ if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+ return false;
+ }
+
+ return true;
+ }
+
+ public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
+
+ TaskRequestEvent taskRequest;
+ while (!taskRequests.isEmpty()) {
+ taskRequest = taskRequests.pollFirst();
+ LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+ QueryUnitAttemptId attemptId;
+ // random allocation
+ if (nonLeafTasks.size() > 0) {
+ synchronized (nonLeafTasks){
+ attemptId = nonLeafTasks.iterator().next();
+ nonLeafTasks.remove(attemptId);
+ }
+ LOG.debug("Assigned based on * match");
+
+ QueryUnit task;
+ task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
+ QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+ attemptId,
+ Lists.newArrayList(task.getAllFragments()),
+ "",
+ false,
+ task.getLogicalPlan().toJson(),
+ context.getMasterContext().getQueryContext(),
+ subQuery.getDataChannel(),
+ subQuery.getBlock().getEnforcer());
+ if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+ taskAssign.setInterQuery();
+ }
+ for (ScanNode scan : task.getScanNodes()) {
+ Collection<URI> fetches = task.getFetch(scan);
+ if (fetches != null) {
+ for (URI fetch : fetches) {
+ taskAssign.addFetch(scan.getTableName(), fetch);
+ }
+ }
+ }
+
+ ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
+ taskRequest.getContainerId());
+ context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+ taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
+ taskRequest.getCallback().run(taskAssign.getProto());
+ totalAssigned++;
+ scheduledObjectNum--;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
new file mode 100644
index 0000000..561f980
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FetchScheduleEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public class FetchScheduleEvent extends TaskSchedulerEvent {
+ private final Map<String, List<URI>> fetches;
+
+ public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
+ final Map<String, List<URI>> fetches) {
+ super(eventType, blockId);
+ this.fetches = fetches;
+ }
+
+ public Map<String, List<URI>> getFetches() {
+ return fetches;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
new file mode 100644
index 0000000..598b1c5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Objects;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+/**
+ * FragmentPair consists of two fragments, a left fragment and a right fragment.
+ * According to queries, it can have the different values.
+ * For join queries, it is assumed to have both fragments.
+ * Also, the left fragment is assumed to be a fragment of the larger table.
+ * For other queries, it is assumed to have only a left fragment.
+ */
+public class FragmentPair {
+ private FileFragment leftFragment;
+ private FileFragment rightFragment;
+
+ public FragmentPair(FileFragment left) {
+ this.leftFragment = left;
+ }
+
+ public FragmentPair(FileFragment left, FileFragment right) {
+ this.leftFragment = left;
+ this.rightFragment = right;
+ }
+
+ public FileFragment getLeftFragment() {
+ return leftFragment;
+ }
+
+ public FileFragment getRightFragment() {
+ return rightFragment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof FragmentPair) {
+ FragmentPair other = (FragmentPair) o;
+ boolean eq = this.leftFragment.equals(other.leftFragment);
+ if (this.rightFragment != null && other.rightFragment != null) {
+ eq &= this.rightFragment.equals(other.rightFragment);
+ } else if (this.rightFragment == null && other.rightFragment == null) {
+ eq &= true;
+ } else {
+ return false;
+ }
+ return eq;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(leftFragment, rightFragment);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
new file mode 100644
index 0000000..10d993d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+/**
+ * FragmentScheduleAlgorithm is used by LazyTaskScheduler.
+ * FragmentScheduleAlgorithm selects a fragment for the given argument.
+ *
+ * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm.
+ */
+public interface FragmentScheduleAlgorithm {
+ void addFragment(FragmentPair fragmentPair);
+ void removeFragment(FragmentPair fragmentPair);
+
+ FragmentPair getHostLocalFragment(String host);
+ FragmentPair getHostLocalFragment(String host, Integer diskId);
+ FragmentPair getRackLocalFragment(String host);
+ FragmentPair getRandomFragment();
+ FragmentPair[] getAllFragments();
+
+ int size();
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
new file mode 100644
index 0000000..820a0fb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+public class FragmentScheduleAlgorithmFactory {
+
+ private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS;
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+ private static final Class<?>[] DEFAULT_PARAMS = {};
+
+ public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf)
+ throws IOException {
+ if (CACHED_ALGORITHM_CLASS != null) {
+ return CACHED_ALGORITHM_CLASS;
+ } else {
+ CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null,
+ FragmentScheduleAlgorithm.class);
+ }
+
+ if (CACHED_ALGORITHM_CLASS == null) {
+ throw new IOException("Scheduler algorithm is null");
+ }
+ return CACHED_ALGORITHM_CLASS;
+ }
+
+ public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) {
+ T result;
+ try {
+ Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
+ if (constructor == null) {
+ constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS);
+ constructor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(clazz, constructor);
+ }
+ result = constructor.newInstance(new Object[]{});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
+ public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException {
+ return get(getScheduleAlgorithmClass(conf));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
new file mode 100644
index 0000000..8acf2b2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.algebra.AlterTablespaceSetType;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.exception.IllegalQueryStatusException;
+import org.apache.tajo.engine.exception.VerifyException;
+import org.apache.tajo.engine.parser.HiveQLAnalyzer;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.*;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
+import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
+
+public class GlobalEngine extends AbstractService {
+ /** Class Logger */
+ private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
+
+ private final MasterContext context;
+ private final AbstractStorageManager sm;
+
+ private SQLAnalyzer analyzer;
+ private HiveQLAnalyzer converter;
+ private CatalogService catalog;
+ private PreLogicalPlanVerifier preVerifier;
+ private LogicalPlanner planner;
+ private LogicalOptimizer optimizer;
+ private LogicalPlanVerifier annotatedPlanVerifier;
+ private DistributedQueryHookManager hookManager;
+
+ public GlobalEngine(final MasterContext context) {
+ super(GlobalEngine.class.getName());
+ this.context = context;
+ this.catalog = context.getCatalog();
+ this.sm = context.getStorageManager();
+ }
+
+ public void start() {
+ try {
+ analyzer = new SQLAnalyzer();
+ converter = new HiveQLAnalyzer();
+ preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
+ planner = new LogicalPlanner(context.getCatalog());
+ optimizer = new LogicalOptimizer(context.getConf());
+ annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
+
+ hookManager = new DistributedQueryHookManager();
+ hookManager.addHook(new CreateTableHook());
+ hookManager.addHook(new InsertHook());
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ super.start();
+ }
+
+ public void stop() {
+ super.stop();
+ }
+
+ public SubmitQueryResponse executeQuery(Session session, String sql)
+ throws InterruptedException, IOException, IllegalQueryStatusException {
+
+ LOG.info("SQL: " + sql);
+ QueryContext queryContext = new QueryContext();
+
+ try {
+ // setting environment variables
+ String [] cmds = sql.split(" ");
+ if(cmds != null) {
+ if(cmds[0].equalsIgnoreCase("set")) {
+ String[] params = cmds[1].split("=");
+ context.getConf().set(params[0], params[1]);
+ SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
+ responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ return responseBuilder.build();
+ }
+ }
+
+ final boolean hiveQueryMode = context.getConf().getBoolVar(TajoConf.ConfVars.HIVE_QUERY_MODE);
+ LOG.info("hive.query.mode:" + hiveQueryMode);
+
+ if (hiveQueryMode) {
+ context.getSystemMetrics().counter("Query", "numHiveMode").inc();
+ queryContext.setHiveQueryMode();
+ }
+
+ context.getSystemMetrics().counter("Query", "totalQuery").inc();
+
+ Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
+ LogicalPlan plan = createLogicalPlan(session, planningContext);
+ SubmitQueryResponse response = executeQueryInternal(queryContext, session, plan, sql);
+ return response;
+ } catch (Throwable t) {
+ context.getSystemMetrics().counter("Query", "errorQuery").inc();
+ LOG.error("\nStack Trace:\n" + StringUtils.stringifyException(t));
+ SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
+ responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setIsForwarded(true);
+ responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+ String errorMessage = t.getMessage();
+ if (t.getMessage() == null) {
+ errorMessage = StringUtils.stringifyException(t);
+ }
+ responseBuilder.setErrorMessage(errorMessage);
+ return responseBuilder.build();
+ }
+ }
+
+ private SubmitQueryResponse executeQueryInternal(QueryContext queryContext,
+ Session session,
+ LogicalPlan plan,
+ String sql) throws Exception {
+
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
+ responseBuilder.setIsForwarded(false);
+ responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+
+ if (PlannerUtil.checkIfDDLPlan(rootNode)) {
+ context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
+ updateQuery(session, rootNode.getChild());
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+
+ } else if (plan.isExplain()) { // explain query
+ String explainStr = PlannerUtil.buildExplainString(plan.getRootBlock().getRoot());
+ Schema schema = new Schema();
+ schema.addColumn("explain", TajoDataTypes.Type.TEXT);
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+
+ SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
+
+ VTuple tuple = new VTuple(1);
+ String[] lines = explainStr.split("\n");
+ int bytesNum = 0;
+ for (String line : lines) {
+ tuple.put(0, DatumFactory.createText(line));
+ byte [] encodedData = encoder.toBytes(tuple);
+ bytesNum += encodedData.length;
+ serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData));
+ }
+ serializedResBuilder.setSchema(schema.getProto());
+ serializedResBuilder.setBytesNum(bytesNum);
+
+ responseBuilder.setResultSet(serializedResBuilder.build());
+ responseBuilder.setMaxRowNum(lines.length);
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+
+ // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
+ } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
+ ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+ TableDesc desc = scanNode.getTableDesc();
+ if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
+ LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
+ responseBuilder.setMaxRowNum((int) limitNode.getFetchFirstNum());
+ } else {
+ if (desc.getStats().getNumBytes() > 0 && desc.getStats().getNumRows() == 0) {
+ responseBuilder.setMaxRowNum(Integer.MAX_VALUE);
+ }
+ }
+ responseBuilder.setTableDesc(desc.getProto());
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+
+ // NonFromQuery indicates a form of 'select a, x+y;'
+ } else if (PlannerUtil.checkIfNonFromQuery(plan)) {
+ Target [] targets = plan.getRootBlock().getRawTargets();
+ if (targets == null) {
+ throw new PlanningException("No targets");
+ }
+ Tuple outTuple = new VTuple(targets.length);
+ for (int i = 0; i < targets.length; i++) {
+ EvalNode eval = targets[i].getEvalTree();
+ outTuple.put(i, eval.eval(null, null));
+ }
+
+ Schema schema = PlannerUtil.targetToSchema(targets);
+ RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
+ byte [] serializedBytes = encoder.toBytes(outTuple);
+ SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder();
+ serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
+ serializedResBuilder.setSchema(schema.getProto());
+ serializedResBuilder.setBytesNum(serializedBytes.length);
+
+ responseBuilder.setResultSet(serializedResBuilder);
+ responseBuilder.setMaxRowNum(1);
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+
+ } else { // it requires distributed execution. So, the query is forwarded to a query master.
+ context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
+ hookManager.doHooks(queryContext, plan);
+
+ QueryJobManager queryJobManager = this.context.getQueryJobManager();
+ QueryInfo queryInfo;
+
+ queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, rootNode);
+
+ if(queryInfo == null) {
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+ responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+ } else {
+ responseBuilder.setIsForwarded(true);
+ responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ if(queryInfo.getQueryMasterHost() != null) {
+ responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ }
+ responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+ LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+ }
+ }
+ SubmitQueryResponse response = responseBuilder.build();
+ return response;
+ }
+
+ public QueryId updateQuery(Session session, String sql) throws IOException, SQLException, PlanningException {
+ try {
+ LOG.info("SQL: " + sql);
+ // parse the query
+ Expr expr = analyzer.parse(sql);
+
+ LogicalPlan plan = createLogicalPlan(session, expr);
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+ if (!PlannerUtil.checkIfDDLPlan(rootNode)) {
+ throw new SQLException("This is not update query:\n" + sql);
+ } else {
+ updateQuery(session, rootNode.getChild());
+ return QueryIdFactory.NULL_QUERY_ID;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+
+ private boolean updateQuery(Session session, LogicalNode root) throws IOException {
+
+ switch (root.getType()) {
+ case CREATE_DATABASE:
+ CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+ createDatabase(session, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+ return true;
+ case DROP_DATABASE:
+ DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+ dropDatabase(session, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+ return true;
+ case CREATE_TABLE:
+ CreateTableNode createTable = (CreateTableNode) root;
+ createTable(session, createTable, createTable.isIfNotExists());
+ return true;
+ case DROP_TABLE:
+ DropTableNode dropTable = (DropTableNode) root;
+ dropTable(session, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+ return true;
+ case ALTER_TABLESPACE:
+ AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+ alterTablespace(session, alterTablespace);
+ return true;
+ case ALTER_TABLE:
+ AlterTableNode alterTable = (AlterTableNode) root;
+ alterTable(session,alterTable);
+ return true;
+ default:
+ throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
+ }
+ }
+
+ private LogicalPlan createLogicalPlan(Session session, Expr expression) throws PlanningException {
+
+ VerificationState state = new VerificationState();
+ preVerifier.verify(session, state, expression);
+ if (!state.verified()) {
+ StringBuilder sb = new StringBuilder();
+ for (String error : state.getErrorMessages()) {
+ sb.append(error).append("\n");
+ }
+ throw new VerifyException(sb.toString());
+ }
+
+ LogicalPlan plan = planner.createPlan(session, expression);
+ LOG.info("=============================================");
+ LOG.info("Non Optimized Query: \n" + plan.toString());
+ LOG.info("=============================================");
+ optimizer.optimize(plan);
+ LOG.info("=============================================");
+ LOG.info("Optimized Query: \n" + plan.toString());
+ LOG.info("=============================================");
+
+ annotatedPlanVerifier.verify(session, state, plan);
+
+ if (!state.verified()) {
+ StringBuilder sb = new StringBuilder();
+ for (String error : state.getErrorMessages()) {
+ sb.append(error).append("\n");
+ }
+ throw new VerifyException(sb.toString());
+ }
+
+ return plan;
+ }
+
+ /**
+ * Alter a given table
+ */
+ public void alterTablespace(final Session session, final AlterTablespaceNode alterTablespace) {
+
+ final CatalogService catalog = context.getCatalog();
+ final String spaceName = alterTablespace.getTablespaceName();
+
+ AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder();
+ builder.setSpaceName(spaceName);
+ if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) {
+ AlterTablespaceCommand.Builder commandBuilder = AlterTablespaceCommand.newBuilder();
+ commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION);
+ commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation()));
+ commandBuilder.build();
+ builder.addCommand(commandBuilder);
+ } else {
+ throw new RuntimeException("This 'ALTER TABLESPACE' is not supported yet.");
+ }
+
+ catalog.alterTablespace(builder.build());
+ }
+
+ /**
+ * Alter a given table
+ */
+ public void alterTable(final Session session, final AlterTableNode alterTable) throws IOException {
+
+ final CatalogService catalog = context.getCatalog();
+ final String tableName = alterTable.getTableName();
+
+ String databaseName;
+ String simpleTableName;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ String[] split = CatalogUtil.splitFQTableName(tableName);
+ databaseName = split[0];
+ simpleTableName = split[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ simpleTableName = tableName;
+ }
+ final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+ if (!catalog.existsTable(databaseName, simpleTableName)) {
+ throw new NoSuchTableException(qualifiedName);
+ }
+
+ switch (alterTable.getAlterTableOpType()) {
+ case RENAME_TABLE:
+ if (!catalog.existsTable(databaseName, simpleTableName)) {
+ throw new NoSuchTableException(alterTable.getTableName());
+ }
+ if (catalog.existsTable(databaseName, alterTable.getNewTableName())) {
+ throw new AlreadyExistsTableException(alterTable.getNewTableName());
+ }
+
+ TableDesc desc = catalog.getTableDesc(databaseName, simpleTableName);
+
+ if (!desc.isExternal()) { // if the table is the managed table
+ Path oldPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+ databaseName, simpleTableName);
+ Path newPath = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR),
+ databaseName, alterTable.getNewTableName());
+ FileSystem fs = oldPath.getFileSystem(context.getConf());
+
+ if (!fs.exists(oldPath)) {
+ throw new IOException("No such a table directory: " + oldPath);
+ }
+ if (fs.exists(newPath)) {
+ throw new IOException("Already table directory exists: " + newPath);
+ }
+
+ fs.rename(oldPath, newPath);
+ }
+ catalog.alterTable(CatalogUtil.renameTable(qualifiedName, alterTable.getNewTableName(),
+ AlterTableType.RENAME_TABLE));
+ break;
+ case RENAME_COLUMN:
+ if (existColumnName(qualifiedName, alterTable.getNewColumnName())) {
+ throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName());
+ }
+ catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(), alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
+ break;
+ case ADD_COLUMN:
+ if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) {
+ throw new ColumnNameAlreadyExistException(alterTable.getAddNewColumn().getSimpleName());
+ }
+ catalog.alterTable(CatalogUtil.addNewColumn(qualifiedName, alterTable.getAddNewColumn(), AlterTableType.ADD_COLUMN));
+ break;
+ default:
+ //TODO
+ }
+ }
+
+ private boolean existColumnName(String tableName, String columnName) {
+ final TableDesc tableDesc = catalog.getTableDesc(tableName);
+ return tableDesc.getSchema().containsByName(columnName) ? true : false;
+ }
+
+ private TableDesc createTable(Session session, CreateTableNode createTable, boolean ifNotExists) throws IOException {
+ TableMeta meta;
+
+ if (createTable.hasOptions()) {
+ meta = CatalogUtil.newTableMeta(createTable.getStorageType(), createTable.getOptions());
+ } else {
+ meta = CatalogUtil.newTableMeta(createTable.getStorageType());
+ }
+
+ if(createTable.isExternal()){
+ Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
+ } else {
+ String databaseName;
+ String tableName;
+ if (CatalogUtil.isFQTableName(createTable.getTableName())) {
+ databaseName = CatalogUtil.extractQualifier(createTable.getTableName());
+ tableName = CatalogUtil.extractSimpleName(createTable.getTableName());
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = createTable.getTableName();
+ }
+
+ // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
+ Path tablePath = StorageUtil.concatPath(sm.getWarehouseDir(), databaseName, tableName);
+ createTable.setPath(tablePath);
+ }
+
+ return createTableOnPath(session, createTable.getTableName(), createTable.getTableSchema(),
+ meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists);
+ }
+
+ public TableDesc createTableOnPath(Session session, String tableName, Schema schema, TableMeta meta,
+ Path path, boolean isExternal, PartitionMethodDesc partitionDesc,
+ boolean ifNotExists)
+ throws IOException {
+ String databaseName;
+ String simpleTableName;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ String [] splitted = CatalogUtil.splitFQTableName(tableName);
+ databaseName = splitted[0];
+ simpleTableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ simpleTableName = tableName;
+ }
+ String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+ boolean exists = catalog.existsTable(databaseName, simpleTableName);
+
+ if (exists) {
+ if (ifNotExists) {
+ LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+ return catalog.getTableDesc(databaseName, simpleTableName);
+ } else {
+ throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
+ }
+ }
+
+ FileSystem fs = path.getFileSystem(context.getConf());
+
+ if (isExternal) {
+ if(!fs.exists(path)) {
+ LOG.error("ERROR: " + path.toUri() + " does not exist");
+ throw new IOException("ERROR: " + path.toUri() + " does not exist");
+ }
+ } else {
+ fs.mkdirs(path);
+ }
+
+ long totalSize = 0;
+
+ try {
+ totalSize = sm.calculateSize(path);
+ } catch (IOException e) {
+ LOG.warn("Cannot calculate the size of the relation", e);
+ }
+
+ TableStats stats = new TableStats();
+ stats.setNumBytes(totalSize);
+ TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
+ schema, meta, path, isExternal);
+ desc.setStats(stats);
+ if (partitionDesc != null) {
+ desc.setPartitionMethod(partitionDesc);
+ }
+
+ if (catalog.createTable(desc)) {
+ LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
+ return desc;
+ } else {
+ LOG.info("Table creation " + tableName + " is failed.");
+ throw new CatalogException("Cannot create table \"" + tableName + "\".");
+ }
+ }
+
+ public boolean createDatabase(@Nullable Session session, String databaseName,
+ @Nullable String tablespace,
+ boolean ifNotExists) throws IOException {
+
+ String tablespaceName;
+ if (tablespace == null) {
+ tablespaceName = DEFAULT_TABLESPACE_NAME;
+ } else {
+ tablespaceName = tablespace;
+ }
+
+ // CREATE DATABASE IF NOT EXISTS
+ boolean exists = catalog.existDatabase(databaseName);
+ if (exists) {
+ if (ifNotExists) {
+ LOG.info("database \"" + databaseName + "\" is already exists." );
+ return true;
+ } else {
+ throw new AlreadyExistsDatabaseException(databaseName);
+ }
+ }
+
+ if (catalog.createDatabase(databaseName, tablespaceName)) {
+ String normalized = databaseName;
+ Path databaseDir = StorageUtil.concatPath(context.getConf().getVar(TajoConf.ConfVars.WAREHOUSE_DIR), normalized);
+ FileSystem fs = databaseDir.getFileSystem(context.getConf());
+ fs.mkdirs(databaseDir);
+ }
+
+ return true;
+ }
+
+ public boolean dropDatabase(Session session, String databaseName, boolean ifExists) {
+
+ boolean exists = catalog.existDatabase(databaseName);
+ if(!exists) {
+ if (ifExists) { // DROP DATABASE IF EXISTS
+ LOG.info("database \"" + databaseName + "\" does not exists." );
+ return true;
+ } else { // Otherwise, it causes an exception.
+ throw new NoSuchDatabaseException(databaseName);
+ }
+ }
+
+ if (session.getCurrentDatabase().equals(databaseName)) {
+ throw new RuntimeException("ERROR: Cannot drop the current open database");
+ }
+
+ boolean result = catalog.dropDatabase(databaseName);
+ LOG.info("database " + databaseName + " is dropped.");
+ return result;
+ }
+
+ /**
+ * Drop a given named table
+ *
+ * @param tableName to be dropped
+ * @param purge Remove all data if purge is true.
+ */
+ public boolean dropTable(Session session, String tableName, boolean ifExists, boolean purge) {
+ CatalogService catalog = context.getCatalog();
+
+ String databaseName;
+ String simpleTableName;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ String [] splitted = CatalogUtil.splitFQTableName(tableName);
+ databaseName = splitted[0];
+ simpleTableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ simpleTableName = tableName;
+ }
+ String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
+
+ boolean exists = catalog.existsTable(qualifiedName);
+ if(!exists) {
+ if (ifExists) { // DROP TABLE IF EXISTS
+ LOG.info("relation \"" + qualifiedName + "\" is already exists." );
+ return true;
+ } else { // Otherwise, it causes an exception.
+ throw new NoSuchTableException(qualifiedName);
+ }
+ }
+
+ Path path = catalog.getTableDesc(qualifiedName).getPath();
+ catalog.dropTable(qualifiedName);
+
+ if (purge) {
+ try {
+ FileSystem fs = path.getFileSystem(context.getConf());
+ fs.delete(path, true);
+ } catch (IOException e) {
+ throw new InternalError(e.getMessage());
+ }
+ }
+
+ LOG.info(String.format("relation \"%s\" is " + (purge ? " purged." : " dropped."), qualifiedName));
+ return true;
+ }
+
+ public interface DistributedQueryHook {
+ boolean isEligible(QueryContext queryContext, LogicalPlan plan);
+ void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
+ }
+
+ public static class DistributedQueryHookManager {
+ private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
+ public void addHook(DistributedQueryHook hook) {
+ hooks.add(hook);
+ }
+
+ public void doHooks(QueryContext queryContext, LogicalPlan plan) {
+ for (DistributedQueryHook hook : hooks) {
+ if (hook.isEligible(queryContext, plan)) {
+ try {
+ hook.hook(queryContext, plan);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ public class CreateTableHook implements DistributedQueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ return rootNode.getChild().getType() == NodeType.CREATE_TABLE;
+ }
+
+ @Override
+ public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ CreateTableNode createTableNode = rootNode.getChild();
+ String [] splitted = CatalogUtil.splitFQTableName(createTableNode.getTableName());
+ String databaseName = splitted[0];
+ String tableName = splitted[1];
+ queryContext.setOutputTable(tableName);
+ queryContext.setOutputPath(
+ StorageUtil.concatPath(TajoConf.getWarehouseDir(context.getConf()), databaseName, tableName));
+ if(createTableNode.getPartitionMethod() != null) {
+ queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
+ }
+ queryContext.setCreateTable();
+ }
+ }
+
+ public static class InsertHook implements DistributedQueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
+ return plan.getRootBlock().getRootType() == NodeType.INSERT;
+ }
+
+ @Override
+ public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception {
+ queryContext.setInsert();
+
+ InsertNode insertNode = plan.getRootBlock().getNode(NodeType.INSERT);
+
+ // Set QueryContext settings, such as output table name and output path.
+ // It also remove data files if overwrite is true.
+ Path outputPath;
+ if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
+ queryContext.setOutputTable(insertNode.getTableName());
+ queryContext.setOutputPath(insertNode.getPath());
+ } else { // INSERT INTO LOCATION ...
+ // When INSERT INTO LOCATION, must not set output table.
+ outputPath = insertNode.getPath();
+ queryContext.setFileOutput();
+ queryContext.setOutputPath(outputPath);
+ }
+
+ if (insertNode.isOverwrite()) {
+ queryContext.setOutputOverwrite();
+ }
+ }
+ }
+}