You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:17:32 UTC

[15/16] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 0000000,bab5903..742665a
mode 000000,100644..100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@@ -1,0 -1,638 +1,650 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.querymaster;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.service.CompositeService;
+ import org.apache.hadoop.util.StringUtils;
+ import org.apache.hadoop.yarn.event.AsyncDispatcher;
+ import org.apache.hadoop.yarn.event.EventHandler;
+ import org.apache.hadoop.yarn.util.Clock;
+ import org.apache.tajo.*;
+ import org.apache.tajo.algebra.Expr;
+ import org.apache.tajo.algebra.JsonHelper;
+ import org.apache.tajo.catalog.CatalogService;
+ import org.apache.tajo.catalog.TableDesc;
+ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+ import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.plan.LogicalOptimizer;
++import org.apache.tajo.plan.LogicalPlan;
++import org.apache.tajo.plan.LogicalPlanner;
++import org.apache.tajo.plan.logical.LogicalRootNode;
++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
++import org.apache.tajo.plan.util.PlannerUtil;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.exception.UnimplementedException;
+ import org.apache.tajo.ha.HAServiceUtil;
+ import org.apache.tajo.ipc.TajoMasterProtocol;
+ import org.apache.tajo.ipc.TajoWorkerProtocol;
+ import org.apache.tajo.master.TajoContainerProxy;
+ import org.apache.tajo.master.event.*;
+ import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+ import org.apache.tajo.session.Session;
+ import org.apache.tajo.plan.LogicalOptimizer;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.LogicalPlanner;
+ import org.apache.tajo.plan.logical.LogicalNode;
 -import org.apache.tajo.plan.logical.LogicalRootNode;
+ import org.apache.tajo.plan.logical.NodeType;
+ import org.apache.tajo.plan.logical.ScanNode;
 -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 -import org.apache.tajo.plan.util.PlannerUtil;
+ import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.rpc.NettyClientBase;
+ import org.apache.tajo.rpc.RpcConnectionPool;
+ import org.apache.tajo.storage.StorageManager;
+ import org.apache.tajo.storage.StorageProperty;
+ import org.apache.tajo.storage.StorageUtil;
+ import org.apache.tajo.util.metrics.TajoMetrics;
+ import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
+ import org.apache.tajo.worker.AbstractResourceAllocator;
+ import org.apache.tajo.worker.TajoResourceAllocator;
+ 
+ import java.io.IOException;
+ import java.util.*;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import static org.apache.tajo.TajoProtos.QueryState;
+ 
+ public class QueryMasterTask extends CompositeService {
+   private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
+ 
+   // query submission directory is private!
+   final public static FsPermission STAGING_DIR_PERMISSION =
+       FsPermission.createImmutable((short) 0700); // rwx--------
+ 
+   public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+ 
+   private QueryId queryId;
+ 
+   private Session session;
+ 
+   private QueryContext queryContext;
+ 
+   private QueryMasterTaskContext queryTaskContext;
+ 
+   private QueryMaster.QueryMasterContext queryMasterContext;
+ 
+   private Query query;
+ 
+   private MasterPlan masterPlan;
+ 
+   private String jsonExpr;
+ 
+   private String logicalPlanJson;
+ 
+   private AsyncDispatcher dispatcher;
+ 
+   private final long querySubmitTime;
+ 
+   private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+ 
+   private TajoConf systemConf;
+ 
+   private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
+ 
+   private AbstractResourceAllocator resourceAllocator;
+ 
+   private AtomicBoolean stopped = new AtomicBoolean(false);
+ 
+   private TajoMetrics queryMetrics;
+ 
+   private Throwable initError;
+ 
+   private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
+       new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
+ 
+   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
+                          QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
+                          String logicalPlanJson) {
+ 
+     super(QueryMasterTask.class.getName());
+     this.queryMasterContext = queryMasterContext;
+     this.queryId = queryId;
+     this.session = session;
+     this.queryContext = queryContext;
+     this.jsonExpr = jsonExpr;
+     this.logicalPlanJson = logicalPlanJson;
+     this.querySubmitTime = System.currentTimeMillis();
+   }
+ 
+   @Override
+   public void init(Configuration conf) {
+     systemConf = (TajoConf)conf;
+ 
+     try {
+       queryTaskContext = new QueryMasterTaskContext();
+       String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
+ 
+       if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
+         resourceAllocator = new TajoResourceAllocator(queryTaskContext);
+       } else {
+         throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
+       }
+       addService(resourceAllocator);
+ 
+       dispatcher = new AsyncDispatcher();
+       addService(dispatcher);
+ 
+       dispatcher.register(StageEventType.class, new StageEventDispatcher());
+       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+       dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
+       dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+       dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
+ 
+       initStagingDir();
+ 
+       queryMetrics = new TajoMetrics(queryId.toString());
+ 
+       super.init(systemConf);
+     } catch (Throwable t) {
+       LOG.error(t.getMessage(), t);
+       initError = t;
+     }
+   }
+ 
+   public boolean isStopped() {
+     return stopped.get();
+   }
+ 
+   @Override
+   public void start() {
+     startQuery();
+     super.start();
+   }
+ 
+   @Override
+   public void stop() {
+ 
+     if(stopped.getAndSet(true)) {
+       return;
+     }
+ 
+     LOG.info("Stopping QueryMasterTask:" + queryId);
+ 
+     try {
+       resourceAllocator.stop();
+     } catch (Throwable t) {
+       LOG.fatal(t.getMessage(), t);
+     }
+ 
+     RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
+     NettyClientBase tmClient = null;
+     try {
+       // In TajoMaster HA mode, if backup master be active status,
+       // worker may fail to connect existing active master. Thus,
+       // if worker can't connect the master, worker should try to connect another master and
+       // update master address in worker context.
+       if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+         try {
+           tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+               TajoMasterProtocol.class, true);
+         } catch (Exception e) {
+           queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
+               HAServiceUtil.getResourceTrackerAddress(systemConf));
+           queryMasterContext.getWorkerContext().setTajoMasterAddress(
+               HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+           tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+               TajoMasterProtocol.class, true);
+         }
+       } else {
+         tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+             TajoMasterProtocol.class, true);
+       }
+     } catch (Exception e) {
+       LOG.error(e.getMessage(), e);
+     } finally {
+       connPool.releaseConnection(tmClient);
+     }
+ 
+     super.stop();
+ 
+     //TODO change report to tajo master
+     if (queryMetrics != null) {
+       queryMetrics.report(new MetricsConsoleReporter());
+     }
+ 
+     LOG.info("Stopped QueryMasterTask:" + queryId);
+   }
+ 
+   public void handleTaskRequestEvent(TaskRequestEvent event) {
+     ExecutionBlockId id = event.getExecutionBlockId();
+     query.getStage(id).handleTaskRequestEvent(event);
+   }
+ 
+   public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
+     synchronized(diagnostics) {
+       if (diagnostics.size() < 10) {
+         diagnostics.add(report);
+       }
+     }
+ 
+     getEventHandler().handle(new TaskFatalErrorEvent(report));
+   }
+ 
+   public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
+     synchronized(diagnostics) {
+       return Collections.unmodifiableCollection(diagnostics);
+     }
+   }
+ 
+   private class StageEventDispatcher implements EventHandler<StageEvent> {
+     public void handle(StageEvent event) {
+       ExecutionBlockId id = event.getStageId();
+       if(LOG.isDebugEnabled()) {
+         LOG.debug("StageEventDispatcher:" + id + "," + event.getType());
+       }
+       query.getStage(id).handle(event);
+     }
+   }
+ 
+   private class TaskEventDispatcher
+       implements EventHandler<TaskEvent> {
+     public void handle(TaskEvent event) {
+       TaskId taskId = event.getTaskId();
+       if(LOG.isDebugEnabled()) {
+         LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
+       }
+       Task task = query.getStage(taskId.getExecutionBlockId()).
+           getTask(taskId);
+       task.handle(event);
+     }
+   }
+ 
+   private class TaskAttemptEventDispatcher
+       implements EventHandler<TaskAttemptEvent> {
+     public void handle(TaskAttemptEvent event) {
+       TaskAttemptId attemptId = event.getTaskAttemptId();
+       Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId());
+       Task task = stage.getTask(attemptId.getTaskId());
+       TaskAttempt attempt = task.getAttempt(attemptId);
+       attempt.handle(event);
+     }
+   }
+ 
+   private class TaskSchedulerDispatcher
+       implements EventHandler<TaskSchedulerEvent> {
+     public void handle(TaskSchedulerEvent event) {
+       Stage stage = query.getStage(event.getExecutionBlockId());
+       stage.getTaskScheduler().handle(event);
+     }
+   }
+ 
+   private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
+     @Override
+     public void handle(LocalTaskEvent event) {
+       TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
+       if (proxy != null) {
+         proxy.killTaskAttempt(event.getTaskAttemptId());
+       }
+     }
+   }
+ 
+   private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
+     @Override
+     public void handle(QueryMasterQueryCompletedEvent event) {
+       QueryId queryId = event.getQueryId();
+       LOG.info("Query completion notified from " + queryId);
+ 
+       while (!isTerminatedState(query.getSynchronizedState())) {
+         try {
+           synchronized (this) {
+             wait(10);
+           }
+         } catch (InterruptedException e) {
+           LOG.error(e);
+         }
+       }
+       LOG.info("Query final state: " + query.getSynchronizedState());
+ 
+       queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
+     }
+   }
+ 
+   private static boolean isTerminatedState(QueryState state) {
+     return
+         state == QueryState.QUERY_SUCCEEDED ||
+         state == QueryState.QUERY_FAILED ||
+         state == QueryState.QUERY_KILLED ||
+         state == QueryState.QUERY_ERROR;
+   }
+ 
+   public synchronized void startQuery() {
+     StorageManager sm = null;
+     LogicalPlan plan = null;
+     try {
+       if (query != null) {
+         LOG.warn("Query already started");
+         return;
+       }
++      LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
+       CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
+       LogicalPlanner planner = new LogicalPlanner(catalog);
 -      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
++      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
+       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
+       jsonExpr = null; // remove the possible OOM
+       plan = planner.createPlan(queryContext, expr);
+ 
+       StoreType storeType = PlannerUtil.getStoreType(plan);
+       if (storeType != null) {
+         sm = StorageManager.getStorageManager(systemConf, storeType);
+         StorageProperty storageProperty = sm.getStorageProperty();
+         if (storageProperty.isSortedInsert()) {
+           String tableName = PlannerUtil.getStoreTableName(plan);
+           LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+           TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+           if (tableDesc == null) {
+             throw new VerifyException("Can't get table meta data from catalog: " + tableName);
+           }
+           List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+               getQueryTaskContext().getQueryContext(), tableDesc);
+           if (storageSpecifiedRewriteRules != null) {
+             for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
+               optimizer.addRuleAfterToJoinOpt(eachRule);
+             }
+           }
+         }
+       }
+ 
+       optimizer.optimize(queryContext, plan);
+ 
+       for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+         LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
+         if (scanNodes != null) {
+           for (LogicalNode eachScanNode : scanNodes) {
+             ScanNode scanNode = (ScanNode) eachScanNode;
+             tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+           }
+         }
+ 
+         scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
+         if (scanNodes != null) {
+           for (LogicalNode eachScanNode : scanNodes) {
+             ScanNode scanNode = (ScanNode) eachScanNode;
+             tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+           }
+         }
++
++        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN);
++        if (scanNodes != null) {
++          for (LogicalNode eachScanNode : scanNodes) {
++            ScanNode scanNode = (ScanNode) eachScanNode;
++            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
++          }
++        }
+       }
+       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+       queryMasterContext.getGlobalPlanner().build(masterPlan);
+ 
+       query = new Query(queryTaskContext, queryId, querySubmitTime,
+           "", queryTaskContext.getEventHandler(), masterPlan);
+ 
+       dispatcher.register(QueryEventType.class, query);
+       queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
+     } catch (Throwable t) {
+       LOG.error(t.getMessage(), t);
+       initError = t;
+ 
+       if (plan != null && sm != null) {
+         LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+         try {
+           sm.rollbackOutputCommit(rootNode.getChild());
+         } catch (IOException e) {
+           LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+         }
+       }
+     }
+   }
+ 
+   private void initStagingDir() throws IOException {
+     Path stagingDir = null;
+     FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
+ 
+     try {
+ 
+       stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext);
+ 
+       // Create a subdirectories
+       LOG.info("The staging dir '" + stagingDir + "' is created.");
+       queryContext.setStagingDir(stagingDir);
+     } catch (IOException ioe) {
+       if (stagingDir != null && defaultFS.exists(stagingDir)) {
+         try {
+           defaultFS.delete(stagingDir, true);
+           LOG.info("The staging directory '" + stagingDir + "' is deleted");
+         } catch (Exception e) {
+           LOG.warn(e.getMessage());
+         }
+       }
+ 
+       throw ioe;
+     }
+   }
+ 
+   /**
+    * It initializes the final output and staging directory and sets
+    * them to variables.
+    */
+   public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException {
+ 
+     String realUser;
+     String currentUser;
+     UserGroupInformation ugi;
+     ugi = UserGroupInformation.getLoginUser();
+     realUser = ugi.getShortUserName();
+     currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ 
+     FileSystem fs;
+     Path stagingDir;
+ 
+     ////////////////////////////////////////////
+     // Create Output Directory
+     ////////////////////////////////////////////
+ 
+     String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
+     if (context.isCreateTable() || context.isInsert()) {
+       if (outputPath == null || outputPath.isEmpty()) {
+         // hbase
+         stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+       } else {
+         stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+       }
+     } else {
+       stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+     }
+ 
+     // initializ
+     fs = stagingDir.getFileSystem(conf);
+ 
+     if (fs.exists(stagingDir)) {
+       throw new IOException("The staging directory '" + stagingDir + "' already exists");
+     }
+     fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+     FileStatus fsStatus = fs.getFileStatus(stagingDir);
+     String owner = fsStatus.getOwner();
+ 
+     if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+       throw new IOException("The ownership on the user's query " +
+           "directory " + stagingDir + " is not as expected. " +
+           "It is owned by " + owner + ". The directory must " +
+           "be owned by the submitter " + currentUser + " or " +
+           "by " + realUser);
+     }
+ 
+     if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+       LOG.info("Permissions on staging directory " + stagingDir + " are " +
+           "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+           "to correct value " + STAGING_DIR_PERMISSION);
+       fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+     }
+ 
+     Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+     fs.mkdirs(stagingResultDir);
+ 
+     return stagingDir;
+   }
+ 
+   public Query getQuery() {
+     return query;
+   }
+ 
+   protected void expireQuerySession() {
+     if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){
+       query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+     }
+   }
+ 
+   public QueryMasterTaskContext getQueryTaskContext() {
+     return queryTaskContext;
+   }
+ 
+   public EventHandler getEventHandler() {
+     return queryTaskContext.getEventHandler();
+   }
+ 
+   public void touchSessionTime() {
+     this.lastClientHeartbeat.set(System.currentTimeMillis());
+   }
+ 
+   public long getLastClientHeartbeat() {
+     return this.lastClientHeartbeat.get();
+   }
+ 
+   public QueryId getQueryId() {
+     return queryId;
+   }
+ 
+   public boolean isInitError() {
+     return initError != null;
+   }
+ 
+   public QueryState getState() {
+     if(query == null) {
+       if (isInitError()) {
+         return QueryState.QUERY_ERROR;
+       } else {
+         return QueryState.QUERY_NOT_ASSIGNED;
+       }
+     } else {
+       return query.getState();
+     }
+   }
+ 
+   public Throwable getInitError() {
+     return initError;
+   }
+ 
+   public String getErrorMessage() {
+     if (isInitError()) {
+       return StringUtils.stringifyException(initError);
+     } else {
+       return null;
+     }
+   }
+ 
+   public long getQuerySubmitTime() {
+     return this.querySubmitTime;
+   }
+ 
+   public class QueryMasterTaskContext {
+     EventHandler eventHandler;
+     public QueryMaster.QueryMasterContext getQueryMasterContext() {
+       return queryMasterContext;
+     }
+ 
+     public Session getSession() {
+       return session;
+     }
+ 
+     public QueryContext getQueryContext() {
+       return queryContext;
+     }
+ 
+     public TajoConf getConf() {
+       return systemConf;
+     }
+ 
+     public Clock getClock() {
+       return queryMasterContext.getClock();
+     }
+ 
+     public Query getQuery() {
+       return query;
+     }
+ 
+     public QueryId getQueryId() {
+       return queryId;
+     }
+ 
+     public Path getStagingDir() {
+       return queryContext.getStagingDir();
+     }
+ 
+     public synchronized EventHandler getEventHandler() {
+       if(eventHandler == null) {
+         eventHandler = dispatcher.getEventHandler();
+       }
+       return eventHandler;
+     }
+ 
+     public AsyncDispatcher getDispatcher() {
+       return dispatcher;
+     }
+ 
+     public Stage getStage(ExecutionBlockId id) {
+       return query.getStage(id);
+     }
+ 
+     public Map<String, TableDesc> getTableDescMap() {
+       return tableDescMap;
+     }
+ 
+     public float getProgress() {
+       if(query == null) {
+         return 0.0f;
+       }
+       return query.getProgress();
+     }
+ 
+     public AbstractResourceAllocator getResourceAllocator() {
+       return resourceAllocator;
+     }
+ 
+     public TajoMetrics getQueryMetrics() {
+       return queryMetrics;
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 6a13898,2ae4bed..c10d3b7
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@@ -31,10 -29,9 +31,10 @@@ import org.apache.tajo.annotation.Nulla
  import org.apache.tajo.conf.TajoConf;
  import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
  import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
 +import org.apache.tajo.ipc.ClientProtos.RequestResult;
  import org.apache.tajo.ipc.ClientProtos.ResultCode;
  import org.apache.tajo.ipc.QueryMasterClientProtocol;
- import org.apache.tajo.master.querymaster.QueryMasterTask;
+ import org.apache.tajo.querymaster.QueryMasterTask;
  import org.apache.tajo.rpc.BlockingRpcServer;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
  import org.apache.tajo.util.NetUtils;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 0000000,a125196..4526863
mode 000000,100644..100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -1,0 -1,125 +1,125 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.querymaster;
+ 
+ import org.apache.tajo.*;
+ import org.apache.tajo.algebra.Expr;
+ import org.apache.tajo.benchmark.TPCH;
+ import org.apache.tajo.catalog.CatalogService;
+ import org.apache.tajo.client.TajoClient;
+ import org.apache.tajo.client.TajoClientImpl;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.engine.parser.SQLAnalyzer;
+ import org.apache.tajo.engine.planner.global.GlobalPlanner;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.master.event.QueryEvent;
+ import org.apache.tajo.master.event.QueryEventType;
+ import org.apache.tajo.session.Session;
+ import org.apache.tajo.plan.LogicalOptimizer;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.LogicalPlanner;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ 
+ import static org.junit.Assert.*;
+ 
+ public class TestKillQuery {
+   private static TajoTestingCluster cluster;
+   private static TajoConf conf;
+   private static TajoClient client;
+ 
+   @BeforeClass
+   public static void setUp() throws Exception {
+     cluster = new TajoTestingCluster();
+     cluster.startMiniClusterInLocal(1);
+     conf = cluster.getConfiguration();
+     client = new TajoClientImpl(cluster.getConfiguration());
+     File file = TPCH.getDataFile("lineitem");
+     client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+         + "using text location 'file://" + file.getAbsolutePath() + "'");
+     assertTrue(client.existTable("default.lineitem"));
+   }
+ 
+   @AfterClass
+   public static void tearDown() throws IOException {
+     if (client != null) client.close();
+     if (cluster != null) cluster.shutdownMiniCluster();
+   }
+ 
+   @Test
+   public final void testKillQueryFromInitState() throws Exception {
+     SQLAnalyzer analyzer = new SQLAnalyzer();
+     QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+     Session session = LocalTajoTestingUtility.createDummySession();
+     CatalogService catalog = cluster.getMaster().getCatalog();
+     String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey";
+ 
+     LogicalPlanner planner = new LogicalPlanner(catalog);
 -    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
++    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+     Expr expr =  analyzer.parse(query);
+     LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ 
+     optimizer.optimize(plan);
+ 
+     QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+     QueryContext queryContext = new QueryContext(conf);
+     MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+     GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+     globalPlanner.build(masterPlan);
+ 
+     QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+         queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson());
+ 
+     queryMasterTask.init(conf);
+     queryMasterTask.getQueryTaskContext().getDispatcher().start();
+     queryMasterTask.startQuery();
+ 
+     try{
+       cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2);
+     } finally {
+       assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState());
+     }
+ 
+     Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+     assertNotNull(stage);
+ 
+     try{
+       cluster.waitForStageState(stage, StageState.INITED, 2);
+     } finally {
+       assertEquals(StageState.INITED, stage.getSynchronizedState());
+     }
+ 
+     // fire kill event
+     Query q = queryMasterTask.getQuery();
+     q.handle(new QueryEvent(queryId, QueryEventType.KILL));
+ 
+     try{
+       cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+     } finally {
+       assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+     }
+     queryMasterTask.stop();
+   }
+ }