You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:46 UTC
[24/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
new file mode 100644
index 0000000..a8f5b31
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -0,0 +1,702 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.InsertNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class Query implements EventHandler<QueryEvent> {
+ private static final Log LOG = LogFactory.getLog(Query.class);
+
+ // Facilities for Query
+ private final TajoConf systemConf;
+ private final Clock clock;
+ private String queryStr;
+ private Map<ExecutionBlockId, SubQuery> subqueries;
+ private final EventHandler eventHandler;
+ private final MasterPlan plan;
+ private final AbstractStorageManager sm;
+ QueryMasterTask.QueryMasterTaskContext context;
+ private ExecutionBlockCursor cursor;
+
+ // Query Status
+ private final QueryId id;
+ private long appSubmitTime;
+ private long startTime;
+ private long finishTime;
+ private TableDesc resultDesc;
+ private int completedSubQueryCount = 0;
+ private int successedSubQueryCount = 0;
+ private int killedSubQueryCount = 0;
+ private int failedSubQueryCount = 0;
+ private int erroredSubQueryCount = 0;
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ // Internal Variables
+ private final Lock readLock;
+ private final Lock writeLock;
+ private int priority = 100;
+
+ // State Machine
+ private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+
+ // Transition Handler
+ private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition();
+ private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
+
+ protected static final StateMachineFactory
+ <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+ new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+ (QueryState.QUERY_NEW)
+
+ // Transitions from NEW state
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
+ QueryEventType.START,
+ new StartTransition())
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+ QueryEventType.KILL,
+ new KillNewQueryTransition())
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from RUNNING state
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+ QueryState.QUERY_ERROR),
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.KILL,
+ new KillSubQueriesTransition())
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from QUERY_SUCCEEDED state
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ // ignore-able transitions
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.KILL)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from KILL_WAIT state
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+ QueryState.QUERY_ERROR),
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ EnumSet.of(QueryEventType.KILL))
+
+ // Transitions from FAILED state
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+ QueryEventType.KILL)
+
+ // Transitions from ERROR state
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.KILL)
+
+ .installTopology();
+
+ public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
+ final long appSubmitTime,
+ final String queryStr,
+ final EventHandler eventHandler,
+ final MasterPlan plan) {
+ this.context = context;
+ this.systemConf = context.getConf();
+ this.id = id;
+ this.clock = context.getClock();
+ this.appSubmitTime = appSubmitTime;
+ this.queryStr = queryStr;
+ subqueries = Maps.newHashMap();
+ this.eventHandler = eventHandler;
+ this.plan = plan;
+ this.sm = context.getStorageManager();
+ cursor = new ExecutionBlockCursor(plan);
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ public float getProgress() {
+ QueryState state = getStateMachine().getCurrentState();
+ if (state == QueryState.QUERY_SUCCEEDED) {
+ return 1.0f;
+ } else {
+ int idx = 0;
+ List<SubQuery> tempSubQueries = new ArrayList<SubQuery>();
+ synchronized(subqueries) {
+ tempSubQueries.addAll(subqueries.values());
+ }
+ float [] subProgresses = new float[tempSubQueries.size()];
+ boolean finished = true;
+ for (SubQuery subquery: tempSubQueries) {
+ if (subquery.getState() != SubQueryState.NEW) {
+ subProgresses[idx] = subquery.getProgress();
+ if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
+ finished = false;
+ }
+ } else {
+ subProgresses[idx] = 0.0f;
+ }
+ idx++;
+ }
+
+ if (finished) {
+ return 1.0f;
+ }
+
+ float totalProgress = 0;
+ float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
+
+ for (int i = 0; i < subProgresses.length; i++) {
+ totalProgress += subProgresses[i] * proportion;
+ }
+
+ return totalProgress;
+ }
+ }
+
+ public long getAppSubmitTime() {
+ return this.appSubmitTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime() {
+ startTime = clock.getTime();
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime() {
+ finishTime = clock.getTime();
+ }
+
+ public List<String> getDiagnostics() {
+ readLock.lock();
+ try {
+ return diagnostics;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected void addDiagnostic(String diag) {
+ diagnostics.add(diag);
+ }
+
+ public TableDesc getResultDesc() {
+ return resultDesc;
+ }
+
+ public void setResultDesc(TableDesc desc) {
+ resultDesc = desc;
+ }
+
+ public MasterPlan getPlan() {
+ return plan;
+ }
+
+ public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+ return stateMachine;
+ }
+
+ public void addSubQuery(SubQuery subquery) {
+ subqueries.put(subquery.getId(), subquery);
+ }
+
+ public QueryId getId() {
+ return this.id;
+ }
+
+ public SubQuery getSubQuery(ExecutionBlockId id) {
+ return this.subqueries.get(id);
+ }
+
+ public Collection<SubQuery> getSubQueries() {
+ return this.subqueries.values();
+ }
+
+ public QueryState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public ExecutionBlockCursor getExecutionBlockCursor() {
+ return cursor;
+ }
+
+ public static class StartTransition
+ implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent queryEvent) {
+ query.setStartTime();
+ SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
+ query.getExecutionBlockCursor().nextBlock(), query.sm);
+ subQuery.setPriority(query.priority--);
+ query.addSubQuery(subQuery);
+
+ subQuery.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INIT));
+ LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+ }
+ }
+
+ public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+ @Override
+ public QueryState transition(Query query, QueryEvent queryEvent) {
+ QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
+ QueryState finalState;
+ if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
+ finalizeQuery(query, subQueryEvent);
+ finalState = QueryState.QUERY_SUCCEEDED;
+ } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
+ finalState = QueryState.QUERY_FAILED;
+ } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
+ finalState = QueryState.QUERY_KILLED;
+ } else {
+ finalState = QueryState.QUERY_ERROR;
+ }
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ query.setFinishTime();
+ return finalState;
+ }
+
+ private void finalizeQuery(Query query, QueryCompletedEvent event) {
+ MasterPlan masterPlan = query.getPlan();
+
+ ExecutionBlock terminal = query.getPlan().getTerminalBlock();
+ DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+ Path finalOutputDir = commitOutputData(query);
+
+ QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+ try {
+ hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
+ finalOutputDir);
+ } catch (Exception e) {
+ query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+ }
+ }
+
+ /**
+ * It moves a result data stored in a staging output dir into a final output dir.
+ */
+ public Path commitOutputData(Query query) {
+ QueryContext queryContext = query.context.getQueryContext();
+ Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+ Path finalOutputDir;
+ if (queryContext.hasOutputPath()) {
+ finalOutputDir = queryContext.getOutputPath();
+ try {
+ FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
+
+ if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO
+
+ // it moves the original table into the temporary location.
+ // Then it moves the new result table into the original table location.
+ // Upon failed, it recovers the original table if possible.
+ boolean movedToOldTable = false;
+ boolean committed = false;
+ Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+ try {
+ if (fs.exists(finalOutputDir)) {
+ fs.rename(finalOutputDir, oldTableDir);
+ movedToOldTable = fs.exists(oldTableDir);
+ } else { // if the parent does not exist, make its parent directory.
+ fs.mkdirs(finalOutputDir.getParent());
+ }
+ fs.rename(stagingResultDir, finalOutputDir);
+ committed = fs.exists(finalOutputDir);
+ } catch (IOException ioe) {
+ // recover the old table
+ if (movedToOldTable && !committed) {
+ fs.rename(oldTableDir, finalOutputDir);
+ }
+ }
+ } else {
+ fs.rename(stagingResultDir, finalOutputDir);
+ LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else {
+ finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+ }
+
+ return finalOutputDir;
+ }
+
+ private static interface QueryHook {
+ boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
+ void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
+ ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
+ }
+
+ private class QueryHookExecutor {
+ private List<QueryHook> hookList = TUtil.newList();
+ private QueryMaster.QueryMasterContext context;
+
+ public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
+ this.context = context;
+ hookList.add(new MaterializedResultHook());
+ hookList.add(new CreateTableHook());
+ hookList.add(new InsertTableHook());
+ }
+
+ public void execute(QueryContext queryContext, Query query,
+ ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) throws Exception {
+ for (QueryHook hook : hookList) {
+ if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
+ hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
+ }
+ }
+ }
+ }
+
+ private class MaterializedResultHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+ NodeType type = lastStage.getBlock().getPlan().getType();
+ return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) throws Exception {
+ SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+ TableMeta meta = lastStage.getTableMeta();
+ TableStats stats = lastStage.getResultStats();
+
+ TableDesc resultTableDesc =
+ new TableDesc(
+ query.getId().toString(),
+ lastStage.getSchema(),
+ meta,
+ finalOutputDir);
+ resultTableDesc.setExternal(true);
+
+ stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+ resultTableDesc.setStats(stats);
+ query.setResultDesc(resultTableDesc);
+ }
+ }
+
+ private class CreateTableHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+ TableStats stats = lastStage.getResultStats();
+
+ CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
+ TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
+
+ TableDesc tableDescTobeCreated =
+ new TableDesc(
+ createTableNode.getTableName(),
+ createTableNode.getTableSchema(),
+ meta,
+ finalOutputDir);
+ tableDescTobeCreated.setExternal(createTableNode.isExternal());
+
+ if (createTableNode.hasPartition()) {
+ tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
+ }
+
+ stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+ tableDescTobeCreated.setStats(stats);
+ query.setResultDesc(tableDescTobeCreated);
+
+ catalog.createTable(tableDescTobeCreated);
+ }
+ }
+
+ private class InsertTableHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
+ throws Exception {
+
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+ TableMeta meta = lastStage.getTableMeta();
+ TableStats stats = lastStage.getResultStats();
+
+ InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
+
+ TableDesc finalTable;
+ if (insertNode.hasTargetTable()) {
+ String tableName = insertNode.getTableName();
+ finalTable = catalog.getTableDesc(tableName);
+ } else {
+ String tableName = query.getId().toString();
+ finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir);
+ }
+
+ long volume = getTableVolume(query.systemConf, finalOutputDir);
+ stats.setNumBytes(volume);
+ finalTable.setStats(stats);
+
+ if (insertNode.hasTargetTable()) {
+ catalog.dropTable(insertNode.getTableName());
+ catalog.createTable(finalTable);
+ }
+
+ query.setResultDesc(finalTable);
+ }
+ }
+
+ private long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(systemConf);
+ ContentSummary directorySummary = fs.getContentSummary(tablePath);
+ return directorySummary.getLength();
+ }
+ }
+
+ public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ private boolean hasNext(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.peek();
+ return !query.getPlan().isTerminal(nextBlock);
+ }
+
+ private void executeNextBlock(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.nextBlock();
+ SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+ nextSubQuery.setPriority(query.priority--);
+ query.addSubQuery(nextSubQuery);
+ nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
+
+ LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+ LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+ }
+ }
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ try {
+ query.completedSubQueryCount++;
+ SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+
+ if (castEvent.getState() == SubQueryState.SUCCEEDED) {
+ query.successedSubQueryCount++;
+ } else if (castEvent.getState() == SubQueryState.KILLED) {
+ query.killedSubQueryCount++;
+ } else if (castEvent.getState() == SubQueryState.FAILED) {
+ query.failedSubQueryCount++;
+ } else if (castEvent.getState() == SubQueryState.ERROR) {
+ query.erroredSubQueryCount++;
+ } else {
+ LOG.error(String.format("Invalid SubQuery (%s) State %s at %s",
+ castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getState().name()));
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+
+ // if a subquery is succeeded and a query is running
+ if (castEvent.getState() == SubQueryState.SUCCEEDED && // latest subquery succeeded
+ query.getState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
+ hasNext(query)) { // there remains at least one subquery.
+ executeNextBlock(query);
+ } else { // if a query is completed due to finished, kill, failure, or error
+ query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+ }
+ } catch (Throwable t) {
+ LOG.error(t);
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+ }
+ }
+
+ private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+ }
+ }
+
+ private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.setFinishTime();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ }
+ }
+
+ private static class KillSubQueriesTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ synchronized (query.subqueries) {
+ for (SubQuery subquery : query.subqueries.values()) {
+ query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL));
+ }
+ }
+ }
+ }
+
+ private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.setFinishTime();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ }
+ }
+
+ @Override
+ public void handle(QueryEvent event) {
+ LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+ try {
+ writeLock.lock();
+ QueryState oldState = getState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
+ }
+
+ //notify the eventhandler of state change
+ if (oldState != getState()) {
+ LOG.info(id + " Query Transitioned from " + oldState + " to " + getState());
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
new file mode 100644
index 0000000..de323cd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+
+public class QueryInProgress extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+ private QueryId queryId;
+
+ private Session session;
+
+ private QueryContext queryContext;
+
+ private TajoAsyncDispatcher dispatcher;
+
+ private LogicalRootNode plan;
+
+ private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private QueryInfo queryInfo;
+
+ private final TajoMaster.MasterContext masterContext;
+
+ private NettyClientBase queryMasterRpc;
+
+ private QueryMasterProtocolService queryMasterRpcClient;
+
+ private YarnProtos.ContainerIdProto qmContainerId;
+
+ public QueryInProgress(
+ TajoMaster.MasterContext masterContext,
+ Session session,
+ QueryContext queryContext,
+ QueryId queryId, String sql, LogicalRootNode plan) {
+ super(QueryInProgress.class.getName());
+ this.masterContext = masterContext;
+ this.session = session;
+ this.queryContext = queryContext;
+ this.queryId = queryId;
+ this.plan = plan;
+
+ queryInfo = new QueryInfo(queryId, sql);
+ queryInfo.setStartTime(System.currentTimeMillis());
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId);
+ this.addService(dispatcher);
+
+ dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
+ super.init(conf);
+ }
+
+ public void kill() {
+ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ }
+
+ @Override
+ public void stop() {
+ if(stopped.getAndSet(true)) {
+ return;
+ }
+
+ LOG.info("=========================================================");
+ LOG.info("Stop query:" + queryId);
+
+ masterContext.getResourceManager().stopQueryMaster(queryId);
+
+ long startTime = System.currentTimeMillis();
+ while(true) {
+ try {
+ if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+ LOG.info(queryId + " QueryMaster stopped");
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ break;
+ }
+
+ try {
+ synchronized (this){
+ wait(100);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ if(System.currentTimeMillis() - startTime > 60 * 1000) {
+ LOG.warn("Failed to stop QueryMaster:" + queryId);
+ break;
+ }
+ }
+
+ if(queryMasterRpc != null) {
+ RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+
+
+ public boolean startQueryMaster() {
+ try {
+ LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+ WorkerResourceManager resourceManager = masterContext.getResourceManager();
+ WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+ // if no resource to allocate a query master
+ if(resource == null) {
+ LOG.info("No Available Resources for QueryMaster");
+ return false;
+ }
+
+ queryInfo.setQueryMaster(resource.getWorkerHost());
+ queryInfo.setQueryMasterPort(resource.getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(resource.getClientPort());
+
+ getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+
+ return true;
+ } catch (Exception e) {
+ catchException(e);
+ return false;
+ }
+ }
+
+ class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
+ @Override
+ public void handle(QueryJobEvent queryJobEvent) {
+ if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+ heartbeat(queryJobEvent.getQueryInfo());
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+ QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+ queryInProgress.getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
+ submmitQueryToMaster();
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
+ stop();
+ } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+ kill();
+ }
+ }
+ }
+
+ public QueryMasterProtocolService getQueryMasterRpcClient() {
+ return queryMasterRpcClient;
+ }
+
+ private void connectQueryMaster() throws Exception {
+ InetSocketAddress addr = NetUtils.createSocketAddrForHost(
+ queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+ LOG.info("Connect to QueryMaster:" + addr);
+ queryMasterRpc =
+ RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);
+ queryMasterRpcClient = queryMasterRpc.getStub();
+ }
+
+ private synchronized void submmitQueryToMaster() {
+ if(querySubmitted.get()) {
+ return;
+ }
+
+ try {
+ if(queryMasterRpcClient == null) {
+ connectQueryMaster();
+ }
+ if(queryMasterRpcClient == null) {
+ LOG.info("No QueryMaster conneciton info.");
+ //TODO wait
+ return;
+ }
+ LOG.info("Call executeQuery to :" +
+ queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+ queryMasterRpcClient.executeQuery(
+ null,
+ TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
+ .setQueryId(queryId.getProto())
+ .setSession(session.getProto())
+ .setQueryContext(queryContext.getProto())
+ .setSql(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getSql()))
+ .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
+ .build(), NullCallback.get());
+ querySubmitted.set(true);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public void catchException(Exception e) {
+ LOG.error(e.getMessage(), e);
+ queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+ queryInfo.setLastMessage(StringUtils.stringifyException(e));
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ private void heartbeat(QueryInfo queryInfo) {
+ LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+ this.queryInfo.setQueryState(queryInfo.getQueryState());
+ this.queryInfo.setProgress(queryInfo.getProgress());
+ this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+ if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+ this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+ LOG.info(queryId + queryInfo.getLastMessage());
+ }
+ if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+ //TODO needed QueryMaster's detail status(failed before or after launching worker)
+ //queryMasterStopped.set(true);
+ LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+ }
+
+ if(!querySubmitted.get()) {
+ getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo));
+ }
+
+ if(isFinishState(this.queryInfo.getQueryState())) {
+ getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo));
+ }
+ }
+
+ private boolean isFinishState(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_FAILED ||
+ state == TajoProtos.QueryState.QUERY_KILLED ||
+ state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
new file mode 100644
index 0000000..b077b36
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+
+public class QueryInfo {
+ private QueryId queryId;
+ private String sql;
+ private TajoProtos.QueryState queryState;
+ private float progress;
+ private long startTime;
+ private long finishTime;
+ private String lastMessage;
+ private String hostNameOfQM;
+ private int queryMasterPort;
+ private int queryMasterClientPort;
+
+ public QueryInfo(QueryId queryId) {
+ this(queryId, null);
+ }
+
+ public QueryInfo(QueryId queryId, String sql) {
+ this.queryId = queryId;
+ this.sql = sql;
+ this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public String getQueryMasterHost() {
+ return hostNameOfQM;
+ }
+
+ public void setQueryMaster(String hostName) {
+ this.hostNameOfQM = hostName;
+
+ }
+
+ public void setQueryMasterPort(int port) {
+ this.queryMasterPort = port;
+ }
+
+ public int getQueryMasterPort() {
+ return queryMasterPort;
+ }
+
+ public void setQueryMasterclientPort(int port) {
+ queryMasterClientPort = port;
+ }
+
+ public int getQueryMasterClientPort() {
+ return queryMasterClientPort;
+ }
+
+ public TajoProtos.QueryState getQueryState() {
+ return queryState;
+ }
+
+ public void setQueryState(TajoProtos.QueryState queryState) {
+ this.queryState = queryState;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getLastMessage() {
+ return lastMessage;
+ }
+
+ public void setLastMessage(String lastMessage) {
+ this.lastMessage = lastMessage;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ @Override
+ public String toString() {
+ return queryId.toString() + "state=" + queryState +",progress=" + progress + ", queryMaster="
+ + getQueryMasterHost();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
new file mode 100644
index 0000000..811de1b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
+ private QueryInfo queryInfo;
+
+ public QueryJobEvent(Type type, QueryInfo queryInfo) {
+ super(type);
+
+ this.queryInfo = queryInfo;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ public enum Type {
+ QUERY_JOB_START,
+ QUERY_JOB_HEARTBEAT,
+ QUERY_JOB_FINISH,
+ QUERY_MASTER_START,
+ QUERY_MASTER_STOP,
+ QUERY_JOB_KILL
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
new file mode 100644
index 0000000..ca45534
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.session.Session;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class QueryJobManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
+
+ // TajoMaster Context
+ private final TajoMaster.MasterContext masterContext;
+
+ private AsyncDispatcher dispatcher;
+
+ private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
+
+ private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
+
+ public QueryJobManager(final TajoMaster.MasterContext masterContext) {
+ super(QueryJobManager.class.getName());
+ this.masterContext = masterContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ try {
+ this.dispatcher = new AsyncDispatcher();
+ addService(this.dispatcher);
+
+ this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+ } catch (Exception e) {
+ catchException(null, e);
+ }
+
+ super.init(conf);
+ }
+
+ @Override
+ public void stop() {
+ synchronized(runningQueries) {
+ for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+ eachQueryInProgress.stop();
+ }
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ public Collection<QueryInProgress> getRunningQueries() {
+ return Collections.unmodifiableCollection(runningQueries.values());
+ }
+
+ public Collection<QueryInProgress> getFinishedQueries() {
+ return Collections.unmodifiableCollection(finishedQueries.values());
+ }
+
+ public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql, LogicalRootNode plan)
+ throws Exception {
+ QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+ QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, plan);
+
+ synchronized(runningQueries) {
+ runningQueries.put(queryId, queryInProgress);
+ }
+
+ addService(queryInProgress);
+ queryInProgress.init(getConfig());
+ queryInProgress.start();
+
+ if(!queryInProgress.startQueryMaster()) {
+ return null;
+ }
+
+ return queryInProgress.getQueryInfo();
+ }
+
+ class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+ @Override
+ public void handle(QueryJobEvent event) {
+ QueryInProgress queryInProgress = null;
+ synchronized(runningQueries) {
+ queryInProgress = runningQueries.get(event.getQueryInfo().getQueryId());
+ if(queryInProgress == null) {
+ LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+ return;
+ }
+ }
+ queryInProgress.getEventHandler().handle(event);
+ }
+ }
+
+ public QueryInProgress getQueryInProgress(QueryId queryId) {
+ synchronized(runningQueries) {
+ return runningQueries.get(queryId);
+ }
+ }
+
+ public QueryInProgress getFinishedQuery(QueryId queryId) {
+ synchronized(finishedQueries) {
+ return finishedQueries.get(queryId);
+ }
+ }
+
+ public void stopQuery(QueryId queryId) {
+ LOG.info("Stop QueryInProgress:" + queryId);
+ QueryInProgress queryInProgress = getQueryInProgress(queryId);
+ if(queryInProgress != null) {
+ queryInProgress.stop();
+ synchronized(runningQueries) {
+ runningQueries.remove(queryId);
+ finishedQueries.put(queryId, queryInProgress);
+ }
+ } else {
+ LOG.warn("No QueryInProgress while query stopping: " + queryId);
+ }
+ }
+
+ private void catchException(QueryId queryId, Exception e) {
+ LOG.error(e.getMessage(), e);
+ QueryInProgress queryInProgress = runningQueries.get(queryId);
+ queryInProgress.catchException(e);
+ }
+
+ public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+ TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+ if(queryInProgress == null) {
+ return null;
+ }
+
+ QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+ getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+ return null;
+ }
+
+ private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+ if(queryHeartbeat.getTajoWorkerHost() != null) {
+ queryInfo.setQueryMaster(queryHeartbeat.getTajoWorkerHost());
+ queryInfo.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(queryHeartbeat.getTajoWorkerClientPort());
+ }
+ queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+ queryInfo.setQueryState(queryHeartbeat.getState());
+ queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+ if (queryHeartbeat.hasQueryFinishTime()) {
+ queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+ }
+
+ return queryInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
new file mode 100644
index 0000000..523f5ba
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.event.QueryStartEvent;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
+
+// TODO - when exception, send error status to QueryJobManager
+public class QueryMaster extends CompositeService implements EventHandler {
+ private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
+
+ private int querySessionTimeout;
+
+ private Clock clock;
+
+ private TajoAsyncDispatcher dispatcher;
+
+ private GlobalPlanner globalPlanner;
+
+ private AbstractStorageManager storageManager;
+
+ private TajoConf systemConf;
+
+ private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
+
+ private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap();
+
+ private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
+
+ private AtomicBoolean queryMasterStop = new AtomicBoolean(false);
+
+ private QueryMasterContext queryMasterContext;
+
+ private QueryHeartbeatThread queryHeartbeatThread;
+
+ private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread;
+
+ private TajoWorker.WorkerContext workerContext;
+
+ private RpcConnectionPool connPool;
+
+ public QueryMaster(TajoWorker.WorkerContext workerContext) {
+ super(QueryMaster.class.getName());
+ this.workerContext = workerContext;
+ }
+
+ public void init(Configuration conf) {
+ LOG.info("QueryMaster init");
+ try {
+ this.systemConf = (TajoConf)conf;
+ this.connPool = RpcConnectionPool.getPool(systemConf);
+
+ querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
+ queryMasterContext = new QueryMasterContext(systemConf);
+
+ clock = new SystemClock();
+
+ this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
+ addIfService(dispatcher);
+
+ this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
+
+ globalPlanner = new GlobalPlanner(systemConf, workerContext);
+
+ dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
+
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ throw new RuntimeException(t);
+ }
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ LOG.info("QueryMaster start");
+
+ queryHeartbeatThread = new QueryHeartbeatThread();
+ queryHeartbeatThread.start();
+
+ clientSessionTimeoutCheckThread = new ClientSessionTimeoutCheckThread();
+ clientSessionTimeoutCheckThread.start();
+
+ finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread();
+ finishedQueryMasterTaskCleanThread.start();
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(queryMasterStop.getAndSet(true)){
+ return;
+ }
+
+ if(queryHeartbeatThread != null) {
+ queryHeartbeatThread.interrupt();
+ }
+
+ if(clientSessionTimeoutCheckThread != null) {
+ clientSessionTimeoutCheckThread.interrupt();
+ }
+
+ if(finishedQueryMasterTaskCleanThread != null) {
+ finishedQueryMasterTaskCleanThread.interrupt();
+ }
+ super.stop();
+
+ LOG.info("QueryMaster stop");
+ if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
+ queryMasterContext.getWorkerContext().stopWorker(true);
+ }
+ }
+
+ private void cleanup(QueryId queryId) {
+ LOG.info("cleanup query resources : " + queryId);
+ NettyClientBase rpc = null;
+ List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+
+ for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ try {
+ if (worker.getPeerRpcPort() == 0) continue;
+
+ rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+ TajoWorkerProtocol.class, true);
+ TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
+
+ tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ } finally {
+ connPool.releaseConnection(rpc);
+ }
+ }
+ }
+
+ public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+
+ NettyClientBase rpc = null;
+ try {
+ rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+
+ CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
+ new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+ masterService.getAllWorkerResource(callBack.getController(),
+ PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
+
+ TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+ return workerResourcesRequest.getWorkerResourcesList();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connPool.releaseConnection(rpc);
+ }
+ return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
+ }
+
+ public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
+ LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
+ NettyClientBase tmClient = null;
+ try {
+ tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+
+ TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
+ .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
+ .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
+ .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setState(state)
+ .setQueryId(queryId.getProto());
+
+ CallFuture<TajoHeartbeatResponse> callBack =
+ new CallFuture<TajoHeartbeatResponse>();
+
+ masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connPool.releaseConnection(tmClient);
+ }
+ }
+
+ @Override
+ public void handle(Event event) {
+ dispatcher.getEventHandler().handle(event);
+ }
+
+ public Query getQuery(QueryId queryId) {
+ return queryMasterTasks.get(queryId).getQuery();
+ }
+
+ public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+ return queryMasterTasks.get(queryId);
+ }
+
+ public QueryMasterTask getQueryMasterTask(QueryId queryId, boolean includeFinished) {
+ QueryMasterTask queryMasterTask = queryMasterTasks.get(queryId);
+ if(queryMasterTask != null) {
+ return queryMasterTask;
+ } else {
+ if(includeFinished) {
+ return finishedQueryMasterTasks.get(queryId);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ public QueryMasterContext getContext() {
+ return this.queryMasterContext;
+ }
+
+ public Collection<QueryMasterTask> getQueryMasterTasks() {
+ return queryMasterTasks.values();
+ }
+
+ public Collection<QueryMasterTask> getFinishedQueryMasterTasks() {
+ return finishedQueryMasterTasks.values();
+ }
+
+ public class QueryMasterContext {
+ private TajoConf conf;
+
+ public QueryMasterContext(TajoConf conf) {
+ this.conf = conf;
+ }
+
+ public TajoConf getConf() {
+ return conf;
+ }
+
+ public TajoAsyncDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public Clock getClock() {
+ return clock;
+ }
+
+ public AbstractStorageManager getStorageManager() {
+ return storageManager;
+ }
+
+ public QueryMaster getQueryMaster() {
+ return QueryMaster.this;
+ }
+
+ public GlobalPlanner getGlobalPlanner() {
+ return globalPlanner;
+ }
+
+ public TajoWorker.WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ public void stopQuery(QueryId queryId) {
+ QueryMasterTask queryMasterTask;
+ queryMasterTask = queryMasterTasks.remove(queryId);
+ finishedQueryMasterTasks.put(queryId, queryMasterTask);
+
+ if(queryMasterTask != null) {
+ TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
+ CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
+
+ NettyClientBase tmClient = null;
+ try {
+ tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
+ } catch (Exception e) {
+ //this function will be closed in new thread.
+ //When tajo do stop cluster, tajo master maybe throw closed connection exception
+
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connPool.releaseConnection(tmClient);
+ }
+
+ try {
+ queryMasterTask.stop();
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")
+ && !workerContext.isYarnContainerMode()) {
+ cleanup(queryId); // TODO We will support yarn mode
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ } else {
+ LOG.warn("No query info:" + queryId);
+ }
+ if(workerContext.isYarnContainerMode()) {
+ stop();
+ }
+ }
+ }
+
+ private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
+ TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
+
+ builder.setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName());
+ builder.setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort());
+ builder.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort());
+ builder.setState(queryMasterTask.getState());
+ builder.setQueryId(queryMasterTask.getQueryId().getProto());
+
+ if (queryMasterTask.getQuery() != null) {
+ builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
+ builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
+ }
+ return builder.build();
+ }
+
+ private class QueryStartEventHandler implements EventHandler<QueryStartEvent> {
+ @Override
+ public void handle(QueryStartEvent event) {
+ LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
+ QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
+ event.getQueryId(), event.getSession(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
+
+ queryMasterTask.init(systemConf);
+ if (!queryMasterTask.isInitError()) {
+ queryMasterTask.start();
+ }
+
+ synchronized(queryMasterTasks) {
+ queryMasterTasks.put(event.getQueryId(), queryMasterTask);
+ }
+
+ if (queryMasterTask.isInitError()) {
+ queryMasterContext.stopQuery(queryMasterTask.getQueryId());
+ return;
+ }
+ }
+ }
+
+ class QueryHeartbeatThread extends Thread {
+ public QueryHeartbeatThread() {
+ super("QueryHeartbeatThread");
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Start QueryMaster heartbeat thread");
+ while(!queryMasterStop.get()) {
+ List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+ synchronized(queryMasterTasks) {
+ tempTasks.addAll(queryMasterTasks.values());
+ }
+ synchronized(queryMasterTasks) {
+ for(QueryMasterTask eachTask: tempTasks) {
+ NettyClientBase tmClient;
+ try {
+ tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
+ TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+
+ CallFuture<TajoHeartbeatResponse> callBack =
+ new CallFuture<TajoHeartbeatResponse>();
+
+ TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
+ masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
+ synchronized(queryMasterStop) {
+ try {
+ queryMasterStop.wait(2000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ LOG.info("QueryMaster heartbeat thread stopped");
+ }
+ }
+
+ class ClientSessionTimeoutCheckThread extends Thread {
+ public void run() {
+ LOG.info("ClientSessionTimeoutCheckThread started");
+ while(!queryMasterStop.get()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ List<QueryMasterTask> tempTasks = new ArrayList<QueryMasterTask>();
+ synchronized(queryMasterTasks) {
+ tempTasks.addAll(queryMasterTasks.values());
+ }
+
+ for(QueryMasterTask eachTask: tempTasks) {
+ if(!eachTask.isStopped()) {
+ try {
+ long lastHeartbeat = eachTask.getLastClientHeartbeat();
+ long time = System.currentTimeMillis() - lastHeartbeat;
+ if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
+ LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms");
+ eachTask.expiredSessionTimeout();
+ }
+ } catch (Exception e) {
+ LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ class FinishedQueryMasterTaskCleanThread extends Thread {
+ public void run() {
+ int expireIntervalTime = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+ LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
+ while(!queryMasterStop.get()) {
+ try {
+ Thread.sleep(60 * 1000 * 60); // hourly
+ } catch (InterruptedException e) {
+ break;
+ }
+ try {
+ long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000;
+ cleanExpiredFinishedQueryMasterTask(expireTime);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
+ synchronized(finishedQueryMasterTasks) {
+ List<QueryId> expiredQueryIds = new ArrayList<QueryId>();
+ for(Map.Entry<QueryId, QueryMasterTask> entry: finishedQueryMasterTasks.entrySet()) {
+ if(entry.getValue().getStartTime() < expireTime) {
+ expiredQueryIds.add(entry.getKey());
+ }
+ }
+
+ for(QueryId eachId: expiredQueryIds) {
+ finishedQueryMasterTasks.remove(eachId);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
new file mode 100644
index 0000000..bf59e9f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tajo.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.LazyTaskScheduler;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.net.InetSocketAddress;
+
+public class QueryMasterManagerService extends CompositeService
+ implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
+ private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
+
+ private AsyncRpcServer rpcServer;
+ private InetSocketAddress bindAddr;
+ private String addr;
+ private int port;
+
+ private QueryMaster queryMaster;
+
+ private TajoWorker.WorkerContext workerContext;
+
+ public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
+ super(QueryMasterManagerService.class.getName());
+ this.workerContext = workerContext;
+ this.port = port;
+ }
+
+ public QueryMaster getQueryMaster() {
+ return queryMaster;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf tajoConf = (TajoConf) conf;
+ try {
+ // Setup RPC server
+ InetSocketAddress initIsa =
+ new InetSocketAddress("0.0.0.0", port);
+ if (initIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initIsa);
+ }
+
+ int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+ this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
+ this.rpcServer.start();
+
+ this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+ this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+ this.port = bindAddr.getPort();
+
+ queryMaster = new QueryMaster(workerContext);
+ addService(queryMaster);
+
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // Get the master address
+ LOG.info("QueryMasterManagerService is bind to " + addr);
+ ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(rpcServer != null) {
+ rpcServer.shutdown();
+ }
+ LOG.info("QueryMasterManagerService stopped");
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddr() {
+ return bindAddr;
+ }
+
+ public String getHostAndPort() {
+ return bindAddr.getHostName() + ":" + bindAddr.getPort();
+ }
+
+ @Override
+ public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
+ RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
+ try {
+ ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
+ QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
+
+ if(queryMasterTask == null || queryMasterTask.isStopped()) {
+ done.run(LazyTaskScheduler.stopTaskRunnerReq);
+ } else {
+ ContainerId cid =
+ queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
+ LOG.debug("getTask:" + cid + ", ebId:" + ebId);
+ queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
+ QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+ if (queryMasterTask == null) {
+ queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+ }
+ SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+ QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
+ QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
+
+ if(LOG.isDebugEnabled()){
+ LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+ }
+
+ if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+ LOG.warn(attemptId + " Killed");
+ attempt.handle(
+ new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+ } else {
+ queryMasterTask.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+ }
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
+ public void ping(RpcController controller,
+ TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ done.run(TajoWorker.TRUE_PROTO);
+ }
+
+ @Override
+ public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+ queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
+ public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+ new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+ queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+
+ @Override
+ public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ QueryId queryId = new QueryId(request);
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+ queryMasterTask.getQuery().handle(new QueryEvent(queryId, QueryEventType.KILL));
+ }
+
+ @Override
+ public void executeQuery(RpcController controller,
+ TajoWorkerProtocol.QueryExecutionRequestProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ try {
+ workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
+ QueryId queryId = new QueryId(request.getQueryId());
+ LOG.info("Receive executeQuery request:" + queryId);
+ queryMaster.handle(new QueryStartEvent(queryId,
+ new Session(request.getSession()),
+ new QueryContext(request.getQueryContext()), request.getSql().getValue(),
+ request.getLogicalPlanJson().getValue()));
+ done.run(TajoWorker.TRUE_PROTO);
+ } catch (Exception e) {
+ workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
+ LOG.error(e.getMessage(), e);
+ done.run(TajoWorker.FALSE_PROTO);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
new file mode 100644
index 0000000..56dd789
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+
+@Deprecated
+public class QueryMasterRunner extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
+ private TajoConf systemConf;
+ private QueryMaster queryMaster;
+ private QueryId queryId;
+ private String queryMasterManagerAddress;
+
+ public QueryMasterRunner(QueryId queryId, String queryMasterManagerAddress) {
+ super(QueryMasterRunner.class.getName());
+ this.queryId = queryId;
+ this.queryMasterManagerAddress = queryMasterManagerAddress;
+ }
+
+ private class ShutdownHook implements Runnable {
+ @Override
+ public void run() {
+ LOG.info("============================================");
+ LOG.info("QueryMaster received SIGINT Signal");
+ LOG.info("============================================");
+ stop();
+ }
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.systemConf = (TajoConf)conf;
+ RackResolver.init(systemConf);
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ //create QueryMaster
+ QueryMaster query = new QueryMaster(null);
+
+ query.init(systemConf);
+ query.start();
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ LOG.info("QueryMasterRunner started");
+
+ final TajoConf conf = new TajoConf();
+ conf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
+ UserGroupInformation.setConfiguration(conf);
+
+ final QueryId queryId = TajoIdUtils.parseQueryId(args[0]);
+ final String queryMasterManagerAddr = args[1];
+
+ LOG.info("Received QueryId:" + queryId);
+
+ QueryMasterRunner queryMasterRunner = new QueryMasterRunner(queryId, queryMasterManagerAddr);
+ queryMasterRunner.init(conf);
+ queryMasterRunner.start();
+
+ synchronized(queryId) {
+ queryId.wait();
+ }
+
+ System.exit(0);
+ }
+
+ public static void printThreadInfo(PrintWriter stream, String title) {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ final int STACK_DEPTH = 60;
+ boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+ long[] threadIds = threadBean.getAllThreadIds();
+ stream.println("Process Thread Dump: " + title);
+ stream.println(threadIds.length + " active threads");
+ for (long tid : threadIds) {
+ ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+ if (info == null) {
+ stream.println(" Inactive");
+ continue;
+ }
+ stream.println("Thread " + getTaskName(info.getThreadId(), info.getThreadName()) + ":");
+ Thread.State state = info.getThreadState();
+ stream.println(" State: " + state);
+ stream.println(" Blocked count: " + info.getBlockedCount());
+ stream.println(" Waited count: " + info.getWaitedCount());
+ if (contention) {
+ stream.println(" Blocked time: " + info.getBlockedTime());
+ stream.println(" Waited time: " + info.getWaitedTime());
+ }
+ if (state == Thread.State.WAITING) {
+ stream.println(" Waiting on " + info.getLockName());
+ } else if (state == Thread.State.BLOCKED) {
+ stream.println(" Blocked on " + info.getLockName());
+ stream.println(" Blocked by " + getTaskName(info.getLockOwnerId(), info.getLockOwnerName()));
+ }
+ stream.println(" Stack:");
+ for (StackTraceElement frame : info.getStackTrace()) {
+ stream.println(" " + frame.toString());
+ }
+ }
+ stream.flush();
+ }
+
+ private static String getTaskName(long id, String name) {
+ if (name == null) {
+ return Long.toString(id);
+ }
+ return id + " (" + name + ")";
+ }
+}