You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:17:32 UTC
[15/16] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 0000000,bab5903..742665a
mode 000000,100644..100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@@ -1,0 -1,638 +1,650 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.querymaster;
+
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.permission.FsPermission;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.service.CompositeService;
+ import org.apache.hadoop.util.StringUtils;
+ import org.apache.hadoop.yarn.event.AsyncDispatcher;
+ import org.apache.hadoop.yarn.event.EventHandler;
+ import org.apache.hadoop.yarn.util.Clock;
+ import org.apache.tajo.*;
+ import org.apache.tajo.algebra.Expr;
+ import org.apache.tajo.algebra.JsonHelper;
+ import org.apache.tajo.catalog.CatalogService;
+ import org.apache.tajo.catalog.TableDesc;
+ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+ import org.apache.tajo.conf.TajoConf;
++import org.apache.tajo.plan.LogicalOptimizer;
++import org.apache.tajo.plan.LogicalPlan;
++import org.apache.tajo.plan.LogicalPlanner;
++import org.apache.tajo.plan.logical.LogicalRootNode;
++import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
++import org.apache.tajo.plan.util.PlannerUtil;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.exception.UnimplementedException;
+ import org.apache.tajo.ha.HAServiceUtil;
+ import org.apache.tajo.ipc.TajoMasterProtocol;
+ import org.apache.tajo.ipc.TajoWorkerProtocol;
+ import org.apache.tajo.master.TajoContainerProxy;
+ import org.apache.tajo.master.event.*;
+ import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+ import org.apache.tajo.session.Session;
+ import org.apache.tajo.plan.LogicalOptimizer;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.LogicalPlanner;
+ import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
+ import org.apache.tajo.plan.logical.NodeType;
+ import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
+ import org.apache.tajo.plan.verifier.VerifyException;
+ import org.apache.tajo.rpc.NettyClientBase;
+ import org.apache.tajo.rpc.RpcConnectionPool;
+ import org.apache.tajo.storage.StorageManager;
+ import org.apache.tajo.storage.StorageProperty;
+ import org.apache.tajo.storage.StorageUtil;
+ import org.apache.tajo.util.metrics.TajoMetrics;
+ import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
+ import org.apache.tajo.worker.AbstractResourceAllocator;
+ import org.apache.tajo.worker.TajoResourceAllocator;
+
+ import java.io.IOException;
+ import java.util.*;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+
+ import static org.apache.tajo.TajoProtos.QueryState;
+
+ public class QueryMasterTask extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
+
+ // query submission directory is private!
+ final public static FsPermission STAGING_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+
+ public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+
+ private QueryId queryId;
+
+ private Session session;
+
+ private QueryContext queryContext;
+
+ private QueryMasterTaskContext queryTaskContext;
+
+ private QueryMaster.QueryMasterContext queryMasterContext;
+
+ private Query query;
+
+ private MasterPlan masterPlan;
+
+ private String jsonExpr;
+
+ private String logicalPlanJson;
+
+ private AsyncDispatcher dispatcher;
+
+ private final long querySubmitTime;
+
+ private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
+
+ private TajoConf systemConf;
+
+ private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
+
+ private AbstractResourceAllocator resourceAllocator;
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private TajoMetrics queryMetrics;
+
+ private Throwable initError;
+
+ private final List<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics =
+ new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
+
+ public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
+ QueryId queryId, Session session, QueryContext queryContext, String jsonExpr,
+ String logicalPlanJson) {
+
+ super(QueryMasterTask.class.getName());
+ this.queryMasterContext = queryMasterContext;
+ this.queryId = queryId;
+ this.session = session;
+ this.queryContext = queryContext;
+ this.jsonExpr = jsonExpr;
+ this.logicalPlanJson = logicalPlanJson;
+ this.querySubmitTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ systemConf = (TajoConf)conf;
+
+ try {
+ queryTaskContext = new QueryMasterTaskContext();
+ String resourceManagerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_MANAGER_CLASS);
+
+ if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
+ resourceAllocator = new TajoResourceAllocator(queryTaskContext);
+ } else {
+ throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
+ }
+ addService(resourceAllocator);
+
+ dispatcher = new AsyncDispatcher();
+ addService(dispatcher);
+
+ dispatcher.register(StageEventType.class, new StageEventDispatcher());
+ dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
+ dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
+ dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+ dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
+
+ initStagingDir();
+
+ queryMetrics = new TajoMetrics(queryId.toString());
+
+ super.init(systemConf);
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ initError = t;
+ }
+ }
+
+ public boolean isStopped() {
+ return stopped.get();
+ }
+
+ @Override
+ public void start() {
+ startQuery();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+
+ if(stopped.getAndSet(true)) {
+ return;
+ }
+
+ LOG.info("Stopping QueryMasterTask:" + queryId);
+
+ try {
+ resourceAllocator.stop();
+ } catch (Throwable t) {
+ LOG.fatal(t.getMessage(), t);
+ }
+
+ RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
+ NettyClientBase tmClient = null;
+ try {
+ // In TajoMaster HA mode, if backup master be active status,
+ // worker may fail to connect existing active master. Thus,
+ // if worker can't connect the master, worker should try to connect another master and
+ // update master address in worker context.
+ if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ try {
+ tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ } catch (Exception e) {
+ queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
+ HAServiceUtil.getResourceTrackerAddress(systemConf));
+ queryMasterContext.getWorkerContext().setTajoMasterAddress(
+ HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+ tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ }
+ } else {
+ tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connPool.releaseConnection(tmClient);
+ }
+
+ super.stop();
+
+ //TODO change report to tajo master
+ if (queryMetrics != null) {
+ queryMetrics.report(new MetricsConsoleReporter());
+ }
+
+ LOG.info("Stopped QueryMasterTask:" + queryId);
+ }
+
+ public void handleTaskRequestEvent(TaskRequestEvent event) {
+ ExecutionBlockId id = event.getExecutionBlockId();
+ query.getStage(id).handleTaskRequestEvent(event);
+ }
+
+ public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
+ synchronized(diagnostics) {
+ if (diagnostics.size() < 10) {
+ diagnostics.add(report);
+ }
+ }
+
+ getEventHandler().handle(new TaskFatalErrorEvent(report));
+ }
+
+ public Collection<TajoWorkerProtocol.TaskFatalErrorReport> getDiagnostics() {
+ synchronized(diagnostics) {
+ return Collections.unmodifiableCollection(diagnostics);
+ }
+ }
+
+ private class StageEventDispatcher implements EventHandler<StageEvent> {
+ public void handle(StageEvent event) {
+ ExecutionBlockId id = event.getStageId();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("StageEventDispatcher:" + id + "," + event.getType());
+ }
+ query.getStage(id).handle(event);
+ }
+ }
+
+ private class TaskEventDispatcher
+ implements EventHandler<TaskEvent> {
+ public void handle(TaskEvent event) {
+ TaskId taskId = event.getTaskId();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
+ }
+ Task task = query.getStage(taskId.getExecutionBlockId()).
+ getTask(taskId);
+ task.handle(event);
+ }
+ }
+
+ private class TaskAttemptEventDispatcher
+ implements EventHandler<TaskAttemptEvent> {
+ public void handle(TaskAttemptEvent event) {
+ TaskAttemptId attemptId = event.getTaskAttemptId();
+ Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId());
+ Task task = stage.getTask(attemptId.getTaskId());
+ TaskAttempt attempt = task.getAttempt(attemptId);
+ attempt.handle(event);
+ }
+ }
+
+ private class TaskSchedulerDispatcher
+ implements EventHandler<TaskSchedulerEvent> {
+ public void handle(TaskSchedulerEvent event) {
+ Stage stage = query.getStage(event.getExecutionBlockId());
+ stage.getTaskScheduler().handle(event);
+ }
+ }
+
+ private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
+ @Override
+ public void handle(LocalTaskEvent event) {
+ TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
+ if (proxy != null) {
+ proxy.killTaskAttempt(event.getTaskAttemptId());
+ }
+ }
+ }
+
+ private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
+ @Override
+ public void handle(QueryMasterQueryCompletedEvent event) {
+ QueryId queryId = event.getQueryId();
+ LOG.info("Query completion notified from " + queryId);
+
+ while (!isTerminatedState(query.getSynchronizedState())) {
+ try {
+ synchronized (this) {
+ wait(10);
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ LOG.info("Query final state: " + query.getSynchronizedState());
+
+ queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId));
+ }
+ }
+
+ private static boolean isTerminatedState(QueryState state) {
+ return
+ state == QueryState.QUERY_SUCCEEDED ||
+ state == QueryState.QUERY_FAILED ||
+ state == QueryState.QUERY_KILLED ||
+ state == QueryState.QUERY_ERROR;
+ }
+
+ public synchronized void startQuery() {
+ StorageManager sm = null;
+ LogicalPlan plan = null;
+ try {
+ if (query != null) {
+ LOG.warn("Query already started");
+ return;
+ }
++ LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
+ CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
+ LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
++ LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
+ Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
+ jsonExpr = null; // remove the possible OOM
+ plan = planner.createPlan(queryContext, expr);
+
+ StoreType storeType = PlannerUtil.getStoreType(plan);
+ if (storeType != null) {
+ sm = StorageManager.getStorageManager(systemConf, storeType);
+ StorageProperty storageProperty = sm.getStorageProperty();
+ if (storageProperty.isSortedInsert()) {
+ String tableName = PlannerUtil.getStoreTableName(plan);
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+ if (tableDesc == null) {
+ throw new VerifyException("Can't get table meta data from catalog: " + tableName);
+ }
+ List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+ getQueryTaskContext().getQueryContext(), tableDesc);
+ if (storageSpecifiedRewriteRules != null) {
+ for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
+ optimizer.addRuleAfterToJoinOpt(eachRule);
+ }
+ }
+ }
+ }
+
+ optimizer.optimize(queryContext, plan);
+
+ for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+ LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
+ if (scanNodes != null) {
+ for (LogicalNode eachScanNode : scanNodes) {
+ ScanNode scanNode = (ScanNode) eachScanNode;
+ tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+ }
+ }
+
+ scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
+ if (scanNodes != null) {
+ for (LogicalNode eachScanNode : scanNodes) {
+ ScanNode scanNode = (ScanNode) eachScanNode;
+ tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+ }
+ }
++
++ scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN);
++ if (scanNodes != null) {
++ for (LogicalNode eachScanNode : scanNodes) {
++ ScanNode scanNode = (ScanNode) eachScanNode;
++ tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
++ }
++ }
+ }
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+ queryMasterContext.getGlobalPlanner().build(masterPlan);
+
+ query = new Query(queryTaskContext, queryId, querySubmitTime,
+ "", queryTaskContext.getEventHandler(), masterPlan);
+
+ dispatcher.register(QueryEventType.class, query);
+ queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START));
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ initError = t;
+
+ if (plan != null && sm != null) {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ try {
+ sm.rollbackOutputCommit(rootNode.getChild());
+ } catch (IOException e) {
+ LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ private void initStagingDir() throws IOException {
+ Path stagingDir = null;
+ FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf);
+
+ try {
+
+ stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext);
+
+ // Create a subdirectories
+ LOG.info("The staging dir '" + stagingDir + "' is created.");
+ queryContext.setStagingDir(stagingDir);
+ } catch (IOException ioe) {
+ if (stagingDir != null && defaultFS.exists(stagingDir)) {
+ try {
+ defaultFS.delete(stagingDir, true);
+ LOG.info("The staging directory '" + stagingDir + "' is deleted");
+ } catch (Exception e) {
+ LOG.warn(e.getMessage());
+ }
+ }
+
+ throw ioe;
+ }
+ }
+
+ /**
+ * It initializes the final output and staging directory and sets
+ * them to variables.
+ */
+ public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException {
+
+ String realUser;
+ String currentUser;
+ UserGroupInformation ugi;
+ ugi = UserGroupInformation.getLoginUser();
+ realUser = ugi.getShortUserName();
+ currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+ FileSystem fs;
+ Path stagingDir;
+
+ ////////////////////////////////////////////
+ // Create Output Directory
+ ////////////////////////////////////////////
+
+ String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
+ if (context.isCreateTable() || context.isInsert()) {
+ if (outputPath == null || outputPath.isEmpty()) {
+ // hbase
+ stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+ } else {
+ stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+ }
+ } else {
+ stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+ }
+
+ // initializ
+ fs = stagingDir.getFileSystem(conf);
+
+ if (fs.exists(stagingDir)) {
+ throw new IOException("The staging directory '" + stagingDir + "' already exists");
+ }
+ fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ FileStatus fsStatus = fs.getFileStatus(stagingDir);
+ String owner = fsStatus.getOwner();
+
+ if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+ throw new IOException("The ownership on the user's query " +
+ "directory " + stagingDir + " is not as expected. " +
+ "It is owned by " + owner + ". The directory must " +
+ "be owned by the submitter " + currentUser + " or " +
+ "by " + realUser);
+ }
+
+ if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+ LOG.info("Permissions on staging directory " + stagingDir + " are " +
+ "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+ "to correct value " + STAGING_DIR_PERMISSION);
+ fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ }
+
+ Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+ fs.mkdirs(stagingResultDir);
+
+ return stagingDir;
+ }
+
+ public Query getQuery() {
+ return query;
+ }
+
+ protected void expireQuerySession() {
+ if(!isTerminatedState(query.getState()) && !(query.getState() == QueryState.QUERY_KILL_WAIT)){
+ query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+ }
+ }
+
+ public QueryMasterTaskContext getQueryTaskContext() {
+ return queryTaskContext;
+ }
+
+ public EventHandler getEventHandler() {
+ return queryTaskContext.getEventHandler();
+ }
+
+ public void touchSessionTime() {
+ this.lastClientHeartbeat.set(System.currentTimeMillis());
+ }
+
+ public long getLastClientHeartbeat() {
+ return this.lastClientHeartbeat.get();
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public boolean isInitError() {
+ return initError != null;
+ }
+
+ public QueryState getState() {
+ if(query == null) {
+ if (isInitError()) {
+ return QueryState.QUERY_ERROR;
+ } else {
+ return QueryState.QUERY_NOT_ASSIGNED;
+ }
+ } else {
+ return query.getState();
+ }
+ }
+
+ public Throwable getInitError() {
+ return initError;
+ }
+
+ public String getErrorMessage() {
+ if (isInitError()) {
+ return StringUtils.stringifyException(initError);
+ } else {
+ return null;
+ }
+ }
+
+ public long getQuerySubmitTime() {
+ return this.querySubmitTime;
+ }
+
+ public class QueryMasterTaskContext {
+ EventHandler eventHandler;
+ public QueryMaster.QueryMasterContext getQueryMasterContext() {
+ return queryMasterContext;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
+
+ public TajoConf getConf() {
+ return systemConf;
+ }
+
+ public Clock getClock() {
+ return queryMasterContext.getClock();
+ }
+
+ public Query getQuery() {
+ return query;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public Path getStagingDir() {
+ return queryContext.getStagingDir();
+ }
+
+ public synchronized EventHandler getEventHandler() {
+ if(eventHandler == null) {
+ eventHandler = dispatcher.getEventHandler();
+ }
+ return eventHandler;
+ }
+
+ public AsyncDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public Stage getStage(ExecutionBlockId id) {
+ return query.getStage(id);
+ }
+
+ public Map<String, TableDesc> getTableDescMap() {
+ return tableDescMap;
+ }
+
+ public float getProgress() {
+ if(query == null) {
+ return 0.0f;
+ }
+ return query.getProgress();
+ }
+
+ public AbstractResourceAllocator getResourceAllocator() {
+ return resourceAllocator;
+ }
+
+ public TajoMetrics getQueryMetrics() {
+ return queryMetrics;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 6a13898,2ae4bed..c10d3b7
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@@ -31,10 -29,9 +31,10 @@@ import org.apache.tajo.annotation.Nulla
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
+import org.apache.tajo.ipc.ClientProtos.RequestResult;
import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
- import org.apache.tajo.master.querymaster.QueryMasterTask;
+ import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 0000000,a125196..4526863
mode 000000,100644..100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -1,0 -1,125 +1,125 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.querymaster;
+
+ import org.apache.tajo.*;
+ import org.apache.tajo.algebra.Expr;
+ import org.apache.tajo.benchmark.TPCH;
+ import org.apache.tajo.catalog.CatalogService;
+ import org.apache.tajo.client.TajoClient;
+ import org.apache.tajo.client.TajoClientImpl;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.engine.parser.SQLAnalyzer;
+ import org.apache.tajo.engine.planner.global.GlobalPlanner;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.master.event.QueryEvent;
+ import org.apache.tajo.master.event.QueryEventType;
+ import org.apache.tajo.session.Session;
+ import org.apache.tajo.plan.LogicalOptimizer;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.LogicalPlanner;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+
+ import java.io.File;
+ import java.io.IOException;
+
+ import static org.junit.Assert.*;
+
+ public class TestKillQuery {
+ private static TajoTestingCluster cluster;
+ private static TajoConf conf;
+ private static TajoClient client;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = new TajoTestingCluster();
+ cluster.startMiniClusterInLocal(1);
+ conf = cluster.getConfiguration();
+ client = new TajoClientImpl(cluster.getConfiguration());
+ File file = TPCH.getDataFile("lineitem");
+ client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) "
+ + "using text location 'file://" + file.getAbsolutePath() + "'");
+ assertTrue(client.existTable("default.lineitem"));
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (client != null) client.close();
+ if (cluster != null) cluster.shutdownMiniCluster();
+ }
+
+ @Test
+ public final void testKillQueryFromInitState() throws Exception {
+ SQLAnalyzer analyzer = new SQLAnalyzer();
+ QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+ Session session = LocalTajoTestingUtility.createDummySession();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+ String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey";
+
+ LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
++ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+ Expr expr = analyzer.parse(query);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+ optimizer.optimize(plan);
+
+ QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+ QueryContext queryContext = new QueryContext(conf);
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+ GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+ globalPlanner.build(masterPlan);
+
+ QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+ QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+ queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson());
+
+ queryMasterTask.init(conf);
+ queryMasterTask.getQueryTaskContext().getDispatcher().start();
+ queryMasterTask.startQuery();
+
+ try{
+ cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2);
+ } finally {
+ assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState());
+ }
+
+ Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+ assertNotNull(stage);
+
+ try{
+ cluster.waitForStageState(stage, StageState.INITED, 2);
+ } finally {
+ assertEquals(StageState.INITED, stage.getSynchronizedState());
+ }
+
+ // fire kill event
+ Query q = queryMasterTask.getQuery();
+ q.handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+ try{
+ cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+ } finally {
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+ }
+ queryMasterTask.stop();
+ }
+ }