You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/22 09:11:40 UTC
[5/9] tajo git commit: TAJO-1262: Rename the prefix 'SubQuery' to
'Stage'.
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
deleted file mode 100644
index d4cc6e7..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ /dev/null
@@ -1,1343 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
-import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.*;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.storage.FileStorageManager;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.util.history.SubQueryHistory;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-
-
-/**
- * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine.
- */
-public class SubQuery implements EventHandler<SubQueryEvent> {
-
- private static final Log LOG = LogFactory.getLog(SubQuery.class);
-
- private MasterPlan masterPlan;
- private ExecutionBlock block;
- private int priority;
- private Schema schema;
- private TableMeta meta;
- private TableStats resultStatistics;
- private TableStats inputStatistics;
- private EventHandler<Event> eventHandler;
- private AbstractTaskScheduler taskScheduler;
- private QueryMasterTask.QueryMasterTaskContext context;
- private final List<String> diagnostics = new ArrayList<String>();
- private SubQueryState subQueryState;
-
- private long startTime;
- private long finishTime;
-
- volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
- volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
- TajoContainer>();
-
- private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
- private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
- private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
- private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
- private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
- new AllocatedContainersCancelTransition();
- private static final SubQueryCompleteTransition SUBQUERY_COMPLETED_TRANSITION =
- new SubQueryCompleteTransition();
- private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> stateMachine;
-
- protected static final StateMachineFactory<SubQuery, SubQueryState,
- SubQueryEventType, SubQueryEvent> stateMachineFactory =
- new StateMachineFactory <SubQuery, SubQueryState,
- SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
-
- // Transitions from NEW state
- .addTransition(SubQueryState.NEW,
- EnumSet.of(SubQueryState.INITED, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
- SubQueryEventType.SQ_INIT,
- new InitAndRequestContainer())
- .addTransition(SubQueryState.NEW, SubQueryState.NEW,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.NEW, SubQueryState.KILLED,
- SubQueryEventType.SQ_KILL)
- .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from INITED state
- .addTransition(SubQueryState.INITED, SubQueryState.RUNNING,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINER_LAUNCH_TRANSITION)
- .addTransition(SubQueryState.INITED, SubQueryState.INITED,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT,
- SubQueryEventType.SQ_KILL, new KillTasksTransition())
- .addTransition(SubQueryState.INITED, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from RUNNING state
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINER_LAUNCH_TRANSITION)
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_TASK_COMPLETED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(SubQueryState.RUNNING,
- EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED),
- SubQueryEventType.SQ_SUBQUERY_COMPLETED,
- SUBQUERY_COMPLETED_TRANSITION)
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_FAILED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.RUNNING, SubQueryState.KILL_WAIT,
- SubQueryEventType.SQ_KILL,
- new KillTasksTransition())
- .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able Transition
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_START)
-
- // Transitions from KILL_WAIT state
- .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
- EnumSet.of(SubQueryEventType.SQ_KILL), new KillTasksTransition())
- .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
- SubQueryEventType.SQ_TASK_COMPLETED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(SubQueryState.KILL_WAIT,
- EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED, SubQueryState.KILLED),
- SubQueryEventType.SQ_SUBQUERY_COMPLETED,
- SUBQUERY_COMPLETED_TRANSITION)
- .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
- SubQueryEventType.SQ_FAILED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(SubQueryState.KILL_WAIT, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from SUCCEEDED state
- .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able events
- .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
- EnumSet.of(
- SubQueryEventType.SQ_START,
- SubQueryEventType.SQ_KILL,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED))
-
- // Transitions from KILLED state
- .addTransition(SubQueryState.KILLED, SubQueryState.KILLED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(SubQueryState.KILLED, SubQueryState.KILLED,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.KILLED, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able transitions
- .addTransition(SubQueryState.KILLED, SubQueryState.KILLED,
- EnumSet.of(
- SubQueryEventType.SQ_START,
- SubQueryEventType.SQ_KILL,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- SubQueryEventType.SQ_FAILED))
-
- // Transitions from FAILED state
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.FAILED, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able transitions
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- EnumSet.of(
- SubQueryEventType.SQ_START,
- SubQueryEventType.SQ_KILL,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- SubQueryEventType.SQ_FAILED))
-
- // Transitions from ERROR state
- .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
- SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- // Ignore-able transitions
- .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
- EnumSet.of(
- SubQueryEventType.SQ_START,
- SubQueryEventType.SQ_KILL,
- SubQueryEventType.SQ_FAILED,
- SubQueryEventType.SQ_INTERNAL_ERROR,
- SubQueryEventType.SQ_SUBQUERY_COMPLETED))
-
- .installTopology();
-
- private final Lock readLock;
- private final Lock writeLock;
-
- private int totalScheduledObjectsCount;
- private int succeededObjectCount = 0;
- private int completedTaskCount = 0;
- private int succeededTaskCount = 0;
- private int killedObjectCount = 0;
- private int failedObjectCount = 0;
- private TaskSchedulerContext schedulerContext;
- private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
- private AtomicInteger completeReportReceived = new AtomicInteger(0);
- private SubQueryHistory finalSubQueryHistory;
-
- public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
- this.context = context;
- this.masterPlan = masterPlan;
- this.block = block;
- this.eventHandler = context.getEventHandler();
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
- stateMachine = stateMachineFactory.make(this);
- subQueryState = stateMachine.getCurrentState();
- }
-
- public static boolean isRunningState(SubQueryState state) {
- return state == SubQueryState.INITED || state == SubQueryState.NEW || state == SubQueryState.RUNNING;
- }
-
- public QueryMasterTask.QueryMasterTaskContext getContext() {
- return context;
- }
-
- public MasterPlan getMasterPlan() {
- return masterPlan;
- }
-
- public DataChannel getDataChannel() {
- return masterPlan.getOutgoingChannels(getId()).iterator().next();
- }
-
- public EventHandler<Event> getEventHandler() {
- return eventHandler;
- }
-
- public AbstractTaskScheduler getTaskScheduler() {
- return taskScheduler;
- }
-
- public void setStartTime() {
- startTime = context.getClock().getTime();
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public long getStartTime() {
- return this.startTime;
- }
-
- public void setFinishTime() {
- finishTime = context.getClock().getTime();
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public long getFinishTime() {
- return this.finishTime;
- }
-
- public float getTaskProgress() {
- readLock.lock();
- try {
- if (getState() == SubQueryState.NEW) {
- return 0;
- } else {
- return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public float getProgress() {
- List<Task> tempTasks = null;
- readLock.lock();
- try {
- if (getState() == SubQueryState.NEW) {
- return 0.0f;
- } else {
- tempTasks = new ArrayList<Task>(tasks.values());
- }
- } finally {
- readLock.unlock();
- }
-
- float totalProgress = 0.0f;
- for (Task eachTask : tempTasks) {
- if (eachTask.getLastAttempt() != null) {
- totalProgress += eachTask.getLastAttempt().getProgress();
- }
- }
-
- if (totalProgress > 0.0f) {
- return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f;
- } else {
- return 0.0f;
- }
- }
-
- public int getSucceededObjectCount() {
- return succeededObjectCount;
- }
-
- public int getTotalScheduledObjectsCount() {
- return totalScheduledObjectsCount;
- }
-
- public ExecutionBlock getBlock() {
- return block;
- }
-
- public void addTask(Task task) {
- tasks.put(task.getId(), task);
- }
-
- public SubQueryHistory getSubQueryHistory() {
- if (finalSubQueryHistory != null) {
- if (finalSubQueryHistory.getFinishTime() == 0) {
- finalSubQueryHistory = makeSubQueryHistory();
- finalSubQueryHistory.setTasks(makeTaskHistories());
- }
- return finalSubQueryHistory;
- } else {
- return makeSubQueryHistory();
- }
- }
-
- private List<TaskHistory> makeTaskHistories() {
- List<TaskHistory> taskHistories = new ArrayList<TaskHistory>();
-
- for(Task eachTask : getTasks()) {
- taskHistories.add(eachTask.getTaskHistory());
- }
-
- return taskHistories;
- }
-
- private SubQueryHistory makeSubQueryHistory() {
- SubQueryHistory subQueryHistory = new SubQueryHistory();
-
- subQueryHistory.setExecutionBlockId(getId().toString());
- subQueryHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan()));
- subQueryHistory.setState(getState().toString());
- subQueryHistory.setStartTime(startTime);
- subQueryHistory.setFinishTime(finishTime);
- subQueryHistory.setSucceededObjectCount(succeededObjectCount);
- subQueryHistory.setKilledObjectCount(killedObjectCount);
- subQueryHistory.setFailedObjectCount(failedObjectCount);
- subQueryHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount);
- subQueryHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned());
- subQueryHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned());
-
- long totalInputBytes = 0;
- long totalReadBytes = 0;
- long totalReadRows = 0;
- long totalWriteBytes = 0;
- long totalWriteRows = 0;
- int numShuffles = 0;
- for(Task eachTask : getTasks()) {
- numShuffles = eachTask.getShuffleOutpuNum();
- if (eachTask.getLastAttempt() != null) {
- TableStats inputStats = eachTask.getLastAttempt().getInputStats();
- if (inputStats != null) {
- totalInputBytes += inputStats.getNumBytes();
- totalReadBytes += inputStats.getReadBytes();
- totalReadRows += inputStats.getNumRows();
- }
- TableStats outputStats = eachTask.getLastAttempt().getResultStats();
- if (outputStats != null) {
- totalWriteBytes += outputStats.getNumBytes();
- totalWriteRows += outputStats.getNumRows();
- }
- }
- }
-
- subQueryHistory.setTotalInputBytes(totalInputBytes);
- subQueryHistory.setTotalReadBytes(totalReadBytes);
- subQueryHistory.setTotalReadRows(totalReadRows);
- subQueryHistory.setTotalWriteBytes(totalWriteBytes);
- subQueryHistory.setTotalWriteRows(totalWriteRows);
- subQueryHistory.setNumShuffles(numShuffles);
- subQueryHistory.setProgress(getProgress());
- return subQueryHistory;
- }
-
- /**
- * It finalizes this subquery. It is only invoked when the subquery is succeeded.
- */
- public void complete() {
- cleanup();
- finalizeStats();
- setFinishTime();
- eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED));
- }
-
- /**
- * It finalizes this subquery. Unlike {@link SubQuery#complete()},
- * it is invoked when a subquery is abnormally finished.
- *
- * @param finalState The final subquery state
- */
- public void abort(SubQueryState finalState) {
- // TODO -
- // - committer.abortSubQuery(...)
- // - record SubQuery Finish Time
- // - CleanUp Tasks
- // - Record History
- cleanup();
- setFinishTime();
- eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
- }
-
- public StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> getStateMachine() {
- return this.stateMachine;
- }
-
- public void setPriority(int priority) {
- this.priority = priority;
- }
-
-
- public int getPriority() {
- return this.priority;
- }
-
- public ExecutionBlockId getId() {
- return block.getId();
- }
-
- public Task[] getTasks() {
- return tasks.values().toArray(new Task[tasks.size()]);
- }
-
- public Task getTask(TaskId qid) {
- return tasks.get(qid);
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public TableMeta getTableMeta() {
- return meta;
- }
-
- public TableStats getResultStats() {
- return resultStatistics;
- }
-
- public TableStats getInputStats() {
- return inputStatistics;
- }
-
- public List<String> getDiagnostics() {
- readLock.lock();
- try {
- return diagnostics;
- } finally {
- readLock.unlock();
- }
- }
-
- protected void addDiagnostic(String diag) {
- diagnostics.add(diag);
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(this.getId());
- return sb.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof SubQuery) {
- SubQuery other = (SubQuery)o;
- return getId().equals(other.getId());
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return getId().hashCode();
- }
-
- public int compareTo(SubQuery other) {
- return getId().compareTo(other.getId());
- }
-
- public SubQueryState getSynchronizedState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- /* non-blocking call for client API */
- public SubQueryState getState() {
- return subQueryState;
- }
-
- public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) {
- TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
- long[] avgRows = new long[]{0, 0};
- long[] numBytes = new long[]{0, 0};
- long[] readBytes = new long[]{0, 0};
- long[] numRows = new long[]{0, 0};
- int[] numBlocks = new int[]{0, 0};
- int[] numOutputs = new int[]{0, 0};
-
- List<ColumnStats> columnStatses = Lists.newArrayList();
-
- MasterPlan masterPlan = subQuery.getMasterPlan();
- Iterator<ExecutionBlock> it = masterPlan.getChilds(subQuery.getBlock()).iterator();
- while (it.hasNext()) {
- ExecutionBlock block = it.next();
- SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId());
- TableStats[] childStatArray = new TableStats[]{
- childSubQuery.getInputStats(), childSubQuery.getResultStats()
- };
- for (int i = 0; i < 2; i++) {
- if (childStatArray[i] == null) {
- continue;
- }
- avgRows[i] += childStatArray[i].getAvgRows();
- numBlocks[i] += childStatArray[i].getNumBlocks();
- numBytes[i] += childStatArray[i].getNumBytes();
- readBytes[i] += childStatArray[i].getReadBytes();
- numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
- numRows[i] += childStatArray[i].getNumRows();
- }
- columnStatses.addAll(childStatArray[1].getColumnStats());
- }
-
- for (int i = 0; i < 2; i++) {
- stat[i].setNumBlocks(numBlocks[i]);
- stat[i].setNumBytes(numBytes[i]);
- stat[i].setReadBytes(readBytes[i]);
- stat[i].setNumShuffleOutputs(numOutputs[i]);
- stat[i].setNumRows(numRows[i]);
- stat[i].setAvgRows(avgRows[i]);
- }
- stat[1].setColumnStats(columnStatses);
-
- return stat;
- }
-
- private TableStats[] computeStatFromTasks() {
- List<TableStats> inputStatsList = Lists.newArrayList();
- List<TableStats> resultStatsList = Lists.newArrayList();
- for (Task unit : getTasks()) {
- resultStatsList.add(unit.getStats());
- if (unit.getLastAttempt().getInputStats() != null) {
- inputStatsList.add(unit.getLastAttempt().getInputStats());
- }
- }
- TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
- TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
- return new TableStats[]{inputStats, resultStats};
- }
-
- private void stopScheduler() {
- // If there are launched TaskRunners, send the 'shouldDie' message to all r
- // via received task requests.
- if (taskScheduler != null) {
- taskScheduler.stop();
- }
- }
-
- private void releaseContainers() {
- // If there are still live TaskRunners, try to kill the containers.
- eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
- }
-
- /**
- * It computes all stats and sets the intermediate result.
- */
- private void finalizeStats() {
- TableStats[] statsArray;
- if (block.hasUnion()) {
- statsArray = computeStatFromUnionBlock(this);
- } else {
- statsArray = computeStatFromTasks();
- }
-
- DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
-
- // if store plan (i.e., CREATE or INSERT OVERWRITE)
- StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
- if (storeType == null) {
- // get default or store type
- storeType = StoreType.CSV;
- }
-
- schema = channel.getSchema();
- meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet());
- inputStatistics = statsArray[0];
- resultStatistics = statsArray[1];
- }
-
- @Override
- public void handle(SubQueryEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType() + ", preState=" + getSynchronizedState());
- }
-
- try {
- writeLock.lock();
- SubQueryState oldState = getSynchronizedState();
- try {
- getStateMachine().doTransition(event.getType(), event);
- subQueryState = getSynchronizedState();
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state"
- + ", eventType:" + event.getType().name()
- + ", oldState:" + oldState.name()
- + ", nextState:" + getSynchronizedState().name()
- , e);
- eventHandler.handle(new SubQueryEvent(getId(),
- SubQueryEventType.SQ_INTERNAL_ERROR));
- }
-
- // notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getSynchronizedState()) {
- LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
- + getSynchronizedState());
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public void handleTaskRequestEvent(TaskRequestEvent event) {
- taskScheduler.handleTaskRequestEvent(event);
- }
-
- private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
- SubQueryEvent, SubQueryState> {
-
- @Override
- public SubQueryState transition(final SubQuery subQuery, SubQueryEvent subQueryEvent) {
- subQuery.setStartTime();
- ExecutionBlock execBlock = subQuery.getBlock();
- SubQueryState state;
-
- try {
- // Union operator does not require actual query processing. It is performed logically.
- if (execBlock.hasUnion()) {
- subQuery.finalizeStats();
- state = SubQueryState.SUCCEEDED;
- } else {
- // execute pre-processing asyncronously
- subQuery.getContext().getQueryMasterContext().getEventExecutor()
- .submit(new Runnable() {
- @Override
- public void run() {
- try {
- ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
- DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
- setShuffleIfNecessary(subQuery, channel);
- initTaskScheduler(subQuery);
- schedule(subQuery);
- subQuery.totalScheduledObjectsCount = subQuery.getTaskScheduler().remainingScheduledObjectNum();
- LOG.info(subQuery.totalScheduledObjectsCount + " objects are scheduled");
-
- if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
- subQuery.complete();
- } else {
- if(subQuery.getSynchronizedState() == SubQueryState.INITED) {
- subQuery.taskScheduler.start();
- allocateContainers(subQuery);
- } else {
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
- }
- }
- } catch (Throwable e) {
- LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e);
- subQuery.setFinishTime();
- subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage()));
- subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR));
- }
- }
- }
- );
- state = SubQueryState.INITED;
- }
- } catch (Throwable e) {
- LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e);
- subQuery.setFinishTime();
- subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage()));
- subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR));
- return SubQueryState.ERROR;
- }
-
- return state;
- }
-
- private void initTaskScheduler(SubQuery subQuery) throws IOException {
- TajoConf conf = subQuery.context.getConf();
- subQuery.schedulerContext = new TaskSchedulerContext(subQuery.context,
- subQuery.getMasterPlan().isLeaf(subQuery.getId()), subQuery.getId());
- subQuery.taskScheduler = TaskSchedulerFactory.get(conf, subQuery.schedulerContext, subQuery);
- subQuery.taskScheduler.init(conf);
- LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling for " + subQuery.getId());
- }
-
- /**
- * If a parent block requires a repartition operation, the method sets proper repartition
- * methods and the number of partitions to a given subquery.
- */
- private static void setShuffleIfNecessary(SubQuery subQuery, DataChannel channel) {
- if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
- int numTasks = calculateShuffleOutputNum(subQuery, channel);
- Repartitioner.setShuffleOutputNumForTwoPhase(subQuery, numTasks, channel);
- }
- }
-
- /**
- * Getting the total memory of cluster
- *
- * @param subQuery
- * @return mega bytes
- */
- private static int getClusterTotalMemory(SubQuery subQuery) {
- List<TajoMasterProtocol.WorkerResourceProto> workers =
- subQuery.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
- int totalMem = 0;
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
- totalMem += worker.getMemoryMB();
- }
- return totalMem;
- }
- /**
- * Getting the desire number of partitions according to the volume of input data.
- * This method is only used to determine the partition key number of hash join or aggregation.
- *
- * @param subQuery
- * @return
- */
- public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel channel) {
- TajoConf conf = subQuery.context.getConf();
- MasterPlan masterPlan = subQuery.getMasterPlan();
- ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock());
-
- LogicalNode grpNode = null;
- if (parent != null) {
- grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
- if (grpNode == null) {
- grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
- }
- }
-
- // We assume this execution block the first stage of join if two or more tables are included in this block,
- if (parent != null && parent.getScanNodes().length >= 2) {
- List<ExecutionBlock> childs = masterPlan.getChilds(parent);
-
- // for outer
- ExecutionBlock outer = childs.get(0);
- long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer);
-
- // for inner
- ExecutionBlock inner = childs.get(1);
- long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
- LOG.info(subQuery.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
- + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
-
- long bigger = Math.max(outerVolume, innerVolume);
-
- int mb = (int) Math.ceil((double) bigger / 1048576);
- LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
-
- int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
-
- if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
- taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
- LOG.warn("!!!!! TESTCASE MODE !!!!!");
- }
-
- // The shuffle output numbers of join may be inconsistent by execution block order.
- // Thus, we need to compare the number with DataChannel output numbers.
- // If the number is right, the number and DataChannel output numbers will be consistent.
- int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0;
- for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
- outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum());
- }
- for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
- innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
- }
- if (outerShuffleOutputNum != innerShuffleOutputNum
- && taskNum != outerShuffleOutputNum
- && taskNum != innerShuffleOutputNum) {
- LOG.info(subQuery.getId() + ", Change determined number of join partitions cause difference of outputNum" +
- ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) +
- ", outerShuffleOutptNum=" + outerShuffleOutputNum +
- ", innerShuffleOutputNum=" + innerShuffleOutputNum);
- taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum);
- }
-
- LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
-
- return taskNum;
- // Is this subquery the first step of group-by?
- } else if (grpNode != null) {
- boolean hasGroupColumns = true;
- if (grpNode.getType() == NodeType.GROUP_BY) {
- hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
- } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
- // Find current distinct stage node.
- DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
- if (distinctNode == null) {
- LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode");
- distinctNode = (DistinctGroupbyNode)grpNode;
- }
- hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
-
- Enforcer enforcer = subQuery.getBlock().getEnforcer();
- if (enforcer == null) {
- LOG.warn(subQuery.getId() + ", DistinctGroupbyNode's enforcer is null.");
- }
- EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
- if (property != null) {
- if (property.getDistinct().getIsMultipleAggregation()) {
- MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage();
- if (stage != MultipleAggregationStage.THRID_STAGE) {
- hasGroupColumns = true;
- }
- }
- }
- }
- if (!hasGroupColumns) {
- LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
- return 1;
- } else {
- long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
-
- int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB);
- LOG.info(subQuery.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
- // determine the number of task
- int taskNum = (int) Math.ceil((double) volumeByMB /
- masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
- LOG.info(subQuery.getId() + ", The determined number of aggregation partitions is " + taskNum);
- return taskNum;
- }
- } else {
- LOG.info("============>>>>> Unexpected Case! <<<<<================");
- long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
- // determine the number of task per 128MB
- int taskNum = (int) Math.ceil((double)mb / 128);
- LOG.info(subQuery.getId() + ", The determined number of partitions is " + taskNum);
- return taskNum;
- }
- }
-
- private static void schedule(SubQuery subQuery) throws IOException {
- MasterPlan masterPlan = subQuery.getMasterPlan();
- ExecutionBlock execBlock = subQuery.getBlock();
- if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
- scheduleFragmentsForLeafQuery(subQuery);
- } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
- Repartitioner.scheduleFragmentsForJoinQuery(subQuery.schedulerContext, subQuery);
- } else { // Case 3: Others (Sort or Aggregation)
- int numTasks = getNonLeafTaskNum(subQuery);
- Repartitioner.scheduleFragmentsForNonLeafTasks(subQuery.schedulerContext, masterPlan, subQuery, numTasks);
- }
- }
-
- /**
- * Getting the desire number of tasks according to the volume of input data
- *
- * @param subQuery
- * @return
- */
- public static int getNonLeafTaskNum(SubQuery subQuery) {
- // Getting intermediate data size
- long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock());
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
- // determine the number of task per 64MB
- int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
- LOG.info(subQuery.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
- return maxTaskNum;
- }
-
- public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
- ExecutionBlock execBlock) {
- Map<String, TableDesc> tableMap = context.getTableDescMap();
- if (masterPlan.isLeaf(execBlock)) {
- ScanNode[] outerScans = execBlock.getScanNodes();
- long maxVolume = 0;
- for (ScanNode eachScanNode: outerScans) {
- TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
- if (stat.getNumBytes() > maxVolume) {
- maxVolume = stat.getNumBytes();
- }
- }
- return maxVolume;
- } else {
- long aggregatedVolume = 0;
- for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
- SubQuery subquery = context.getSubQuery(childBlock.getId());
- if (subquery == null || subquery.getSynchronizedState() != SubQueryState.SUCCEEDED) {
- aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
- } else {
- aggregatedVolume += subquery.getResultStats().getNumBytes();
- }
- }
-
- return aggregatedVolume;
- }
- }
-
- public static void allocateContainers(SubQuery subQuery) {
- ExecutionBlock execBlock = subQuery.getBlock();
-
- //TODO consider disk slot
- int requiredMemoryMBPerTask = 512;
-
- int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
- subQuery.getContext().getQueryMasterContext().getWorkerContext(),
- subQuery.schedulerContext.getEstimatedTaskNum(),
- requiredMemoryMBPerTask
- );
-
- final Resource resource = Records.newRecord(Resource.class);
-
- resource.setMemory(requiredMemoryMBPerTask);
-
- LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
-
- Priority priority = Records.newRecord(Priority.class);
- priority.setPriority(subQuery.getPriority());
- ContainerAllocationEvent event =
- new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
- subQuery.getId(), priority, resource, numRequest,
- subQuery.masterPlan.isLeaf(execBlock), 0.0f);
- subQuery.eventHandler.handle(event);
- }
-
- private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
- ScanNode[] scans = execBlock.getScanNodes();
- Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
- ScanNode scan = scans[0];
- TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
-
- Collection<Fragment> fragments;
- TableMeta meta = table.getMeta();
-
- // Depending on scanner node's type, it creates fragments. If scan is for
- // a partitioned table, It will creates lots fragments for all partitions.
- // Otherwise, it creates at least one fragments for a table, which may
- // span a number of blocks or possibly consists of a number of files.
- if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- // After calling this method, partition paths are removed from the physical plan.
- FileStorageManager storageManager =
- (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
- fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
- } else {
- StorageManager storageManager =
- StorageManager.getStorageManager(subQuery.getContext().getConf(), meta.getStoreType());
- fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan);
- }
-
- SubQuery.scheduleFragments(subQuery, fragments);
- if (subQuery.getTaskScheduler() instanceof DefaultTaskScheduler) {
- //Leaf task of DefaultTaskScheduler should be fragment size
- // EstimatedTaskNum determined number of initial container
- subQuery.schedulerContext.setEstimatedTaskNum(fragments.size());
- } else {
- TajoConf conf = subQuery.context.getConf();
- subQuery.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
- int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
- (double) subQuery.schedulerContext.getTaskSize());
- subQuery.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
- }
- }
- }
-
- public static void scheduleFragment(SubQuery subQuery, Fragment fragment) {
- subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
- subQuery.getId(), fragment));
- }
-
-
- public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> fragments) {
- for (Fragment eachFragment : fragments) {
- scheduleFragment(subQuery, eachFragment);
- }
- }
-
- public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> leftFragments,
- Collection<Fragment> broadcastFragments) {
- for (Fragment eachLeafFragment : leftFragments) {
- scheduleFragment(subQuery, eachLeafFragment, broadcastFragments);
- }
- }
-
- public static void scheduleFragment(SubQuery subQuery,
- Fragment leftFragment, Collection<Fragment> rightFragments) {
- subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
- subQuery.getId(), leftFragment, rightFragments));
- }
-
- public static void scheduleFetches(SubQuery subQuery, Map<String, List<FetchImpl>> fetches) {
- subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
- subQuery.getId(), fetches));
- }
-
- public static Task newEmptyTask(TaskSchedulerContext schedulerContext,
- TaskAttemptScheduleContext taskContext,
- SubQuery subQuery, int taskId) {
- ExecutionBlock execBlock = subQuery.getBlock();
- Task unit = new Task(schedulerContext.getMasterContext().getConf(),
- taskContext,
- QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId),
- schedulerContext.isLeafQuery(), subQuery.eventHandler);
- unit.setLogicalPlan(execBlock.getPlan());
- subQuery.addTask(unit);
- return unit;
- }
-
- private static class ContainerLaunchTransition
- implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent event) {
- try {
- SubQueryContainerAllocationEvent allocationEvent =
- (SubQueryContainerAllocationEvent) event;
- for (TajoContainer container : allocationEvent.getAllocatedContainer()) {
- TajoContainerId cId = container.getId();
- if (subQuery.containers.containsKey(cId)) {
- subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
- "Duplicated containers are allocated: " + cId.toString()));
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
- }
- subQuery.containers.put(cId, container);
- }
- LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
- subQuery.eventHandler.handle(
- new LaunchTaskRunnersEvent(subQuery.getId(), allocationEvent.getAllocatedContainer(),
- subQuery.getContext().getQueryContext(),
- CoreGsonHelper.toJson(subQuery.getBlock().getPlan(), LogicalNode.class))
- );
-
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
- } catch (Throwable t) {
- subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
- ExceptionUtils.getStackTrace(t)));
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
- }
- }
- }
-
- /**
- * It is used in KILL_WAIT state against Contained Allocated event.
- * It just returns allocated containers to resource manager.
- */
- private static class AllocatedContainersCancelTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent event) {
- try {
- SubQueryContainerAllocationEvent allocationEvent =
- (SubQueryContainerAllocationEvent) event;
- subQuery.eventHandler.handle(
- new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
- subQuery.getId(), allocationEvent.getAllocatedContainer()));
- LOG.info(String.format("[%s] %d allocated containers are canceled",
- subQuery.getId().toString(),
- allocationEvent.getAllocatedContainer().size()));
- } catch (Throwable t) {
- subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
- ExceptionUtils.getStackTrace(t)));
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
- }
- }
- }
-
- private static class TaskCompletedTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
- @Override
- public void transition(SubQuery subQuery,
- SubQueryEvent event) {
- SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
- Task task = subQuery.getTask(taskEvent.getTaskId());
-
- if (task == null) { // task failed
- LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
- } else {
- subQuery.completedTaskCount++;
-
- if (taskEvent.getState() == TaskState.SUCCEEDED) {
- subQuery.succeededObjectCount++;
- } else if (task.getState() == TaskState.KILLED) {
- subQuery.killedObjectCount++;
- } else if (task.getState() == TaskState.FAILED) {
- subQuery.failedObjectCount++;
- // if at least one task is failed, try to kill all tasks.
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
- }
-
- LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
- subQuery.getId(),
- subQuery.getTotalScheduledObjectsCount(),
- subQuery.succeededObjectCount,
- subQuery.killedObjectCount,
- subQuery.failedObjectCount));
-
- if (subQuery.totalScheduledObjectsCount ==
- subQuery.succeededObjectCount + subQuery.killedObjectCount + subQuery.failedObjectCount) {
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_SUBQUERY_COMPLETED));
- }
- }
- }
- }
-
- private static class KillTasksTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- if(subQuery.getTaskScheduler() != null){
- subQuery.getTaskScheduler().stop();
- }
-
- for (Task task : subQuery.getTasks()) {
- subQuery.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL));
- }
- }
- }
-
- private void cleanup() {
- stopScheduler();
- releaseContainers();
-
- if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
- List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
- List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
-
- for (ExecutionBlock executionBlock : childs) {
- ebIds.add(executionBlock.getId().getProto());
- }
-
- getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
- }
-
- this.finalSubQueryHistory = makeSubQueryHistory();
- this.finalSubQueryHistory.setTasks(makeTaskHistories());
- }
-
- public List<IntermediateEntry> getHashShuffleIntermediateEntries() {
- return hashShuffleIntermediateEntries;
- }
-
- protected void waitingIntermediateReport() {
- LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
- synchronized(completeReportReceived) {
- long startTime = System.currentTimeMillis();
- while (true) {
- if (completeReportReceived.get() >= tasks.size()) {
- LOG.info(getId() + ", completed waiting IntermediateReport");
- return;
- } else {
- try {
- completeReportReceived.wait(10 * 1000);
- } catch (InterruptedException e) {
- }
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (elapsedTime >= 120 * 1000) {
- LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
- abort(SubQueryState.FAILED);
- return;
- }
- }
- }
- }
- }
-
- public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
- LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks());
- if (!report.getReportSuccess()) {
- LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
- abort(SubQueryState.FAILED);
- return;
- }
- if (report.getIntermediateEntriesCount() > 0) {
- synchronized (hashShuffleIntermediateEntries) {
- for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
- hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
- }
- }
- }
- synchronized(completeReportReceived) {
- completeReportReceived.addAndGet(report.getSucceededTasks());
- completeReportReceived.notifyAll();
- }
- }
-
- private static class SubQueryCompleteTransition
- implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
-
- @Override
- public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- // TODO - Commit subQuery
- // TODO - records succeeded, failed, killed completed task
- // TODO - records metrics
- try {
- LOG.info(String.format("subQuery completed - %s (total=%d, success=%d, killed=%d)",
- subQuery.getId().toString(),
- subQuery.getTotalScheduledObjectsCount(),
- subQuery.getSucceededObjectCount(),
- subQuery.killedObjectCount));
-
- if (subQuery.killedObjectCount > 0 || subQuery.failedObjectCount > 0) {
- if (subQuery.failedObjectCount > 0) {
- subQuery.abort(SubQueryState.FAILED);
- return SubQueryState.FAILED;
- } else if (subQuery.killedObjectCount > 0) {
- subQuery.abort(SubQueryState.KILLED);
- return SubQueryState.KILLED;
- } else {
- LOG.error("Invalid State " + subQuery.getSynchronizedState() + " State");
- subQuery.abort(SubQueryState.ERROR);
- return SubQueryState.ERROR;
- }
- } else {
- subQuery.complete();
- return SubQueryState.SUCCEEDED;
- }
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- subQuery.abort(SubQueryState.ERROR);
- return SubQueryState.ERROR;
- }
- }
- }
-
- private static class DiagnosticsUpdateTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent event) {
- subQuery.addDiagnostic(((SubQueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
- }
- }
-
- private static class InternalErrorTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- subQuery.abort(SubQueryState.ERROR);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
deleted file mode 100644
index effcfde..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-public enum SubQueryState {
- NEW,
- INITED,
- RUNNING,
- SUCCEEDED,
- FAILED,
- KILL_WAIT,
- KILLED,
- ERROR
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
index 7de3933..5475791 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
@@ -580,7 +580,7 @@ public class Task implements EventHandler<TaskEvent> {
@Override
public void transition(Task task, TaskEvent taskEvent) {
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+ task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.KILLED));
}
}
@@ -598,7 +598,7 @@ public class Task implements EventHandler<TaskEvent> {
@Override
public void transition(Task task, TaskEvent event) {
task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+ task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.KILLED));
}
}
@@ -617,7 +617,7 @@ public class Task implements EventHandler<TaskEvent> {
task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
+ task.eventHandler.handle(new StageTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
}
}
@@ -644,7 +644,7 @@ public class Task implements EventHandler<TaskEvent> {
task.finishedAttempts++;
task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+ task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED));
}
}
@@ -669,7 +669,7 @@ public class Task implements EventHandler<TaskEvent> {
}
} else {
task.finishTask();
- task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+ task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED));
return TaskState.FAILED;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
index 63c6dbb..0f161ff 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java
@@ -420,11 +420,11 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
+ ", nextState:" + getState().name()
, e);
eventHandler.handle(
- new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+ new StageDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
"Can't handle this event at current state of " + event.getTaskAttemptId() + ")"));
eventHandler.handle(
- new SubQueryEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
- SubQueryEventType.SQ_INTERNAL_ERROR));
+ new StageEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(),
+ StageEventType.SQ_INTERNAL_ERROR));
}
//notify the eventhandler of state change
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index 0573197..6050617 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -28,9 +28,9 @@ import org.apache.tajo.master.ha.HAService;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.Task;
-import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.Stage;
import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.util.history.SubQueryHistory;
+import org.apache.tajo.util.history.StageHistory;
import org.apache.tajo.worker.TaskRunnerHistory;
import org.apache.tajo.worker.TaskRunner;
@@ -144,50 +144,50 @@ public class JSPUtil {
return queryProgressList;
}
- public static List<SubQuery> sortSubQuery(Collection<SubQuery> subQueries) {
- List<SubQuery> subQueryList = new ArrayList<SubQuery>(subQueries);
- Collections.sort(subQueryList, new Comparator<SubQuery>() {
+ public static List<Stage> sortStages(Collection<Stage> stages) {
+ List<Stage> stageList = new ArrayList<Stage>(stages);
+ Collections.sort(stageList, new Comparator<Stage>() {
@Override
- public int compare(SubQuery subQuery1, SubQuery subQuery2) {
- long q1StartTime = subQuery1.getStartTime();
- long q2StartTime = subQuery2.getStartTime();
+ public int compare(Stage stage1, Stage stage2) {
+ long q1StartTime = stage1.getStartTime();
+ long q2StartTime = stage2.getStartTime();
q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime);
q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime);
int result = compareLong(q1StartTime, q2StartTime);
if (result == 0) {
- return subQuery1.getId().toString().compareTo(subQuery2.getId().toString());
+ return stage1.getId().toString().compareTo(stage2.getId().toString());
} else {
return result;
}
}
});
- return subQueryList;
+ return stageList;
}
- public static List<SubQueryHistory> sortSubQueryHistory(Collection<SubQueryHistory> subQueries) {
- List<SubQueryHistory> subQueryList = new ArrayList<SubQueryHistory>(subQueries);
- Collections.sort(subQueryList, new Comparator<SubQueryHistory>() {
+ public static List<StageHistory> sortStageHistories(Collection<StageHistory> stages) {
+ List<StageHistory> stageList = new ArrayList<StageHistory>(stages);
+ Collections.sort(stageList, new Comparator<StageHistory>() {
@Override
- public int compare(SubQueryHistory subQuery1, SubQueryHistory subQuery2) {
- long q1StartTime = subQuery1.getStartTime();
- long q2StartTime = subQuery2.getStartTime();
+ public int compare(StageHistory stage1, StageHistory stage2) {
+ long q1StartTime = stage1.getStartTime();
+ long q2StartTime = stage2.getStartTime();
q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime);
q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime);
int result = compareLong(q1StartTime, q2StartTime);
if (result == 0) {
- return subQuery1.getExecutionBlockId().compareTo(subQuery2.getExecutionBlockId());
+ return stage1.getExecutionBlockId().compareTo(stage2.getExecutionBlockId());
} else {
return result;
}
}
});
- return subQueryList;
+ return stageList;
}
public static String getMasterActiveLabel(MasterContext context) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index 9fb427f..932f584 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -210,7 +210,7 @@ public class HistoryReader {
in.readFully(buf, 0, buf.length);
- return SubQueryHistory.fromJsonTasks(new String(buf));
+ return StageHistory.fromJsonTasks(new String(buf));
} finally {
if (in != null) {
in.close();
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index 7e30f9c..5934885 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* <tajo.history.query.dir>/<yyyyMMdd>/query-list/query-list-<HHmmss>.hist (TajoMaster's query list, hourly rolling)
* /query-detail/<QUERY_ID>/query.hist (QueryMaster's query detail)
- * /<EB_ID>.hist (QueryMaster's subquery detail)
+ * /<EB_ID>.hist (QueryMaster's stage detail)
* <tajo.history.task.dir>/<yyyyMMdd>/tasks/<WORKER_HOST>_<WORKER_PORT>/<WORKER_HOST>_<WORKER_PORT>_<HH>_<seq>.hist
* History files are kept for "tajo.history.expiry-time-day" (default value is 7 days)
*/
@@ -267,7 +267,7 @@ public class HistoryWriter extends AbstractService {
// QueryMaster's query detail history (json format)
// <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/query.hist
- // QueryMaster's subquery detail history(proto binary format)
+ // QueryMaster's stage detail history(proto binary format)
// <tajo.query-history.path>/<yyyyMMdd>/query-detail/<QUERY_ID>/<EB_ID>.hist
Path queryHistoryFile = getQueryHistoryFilePath(historyParentPath, queryHistory.getQueryId());
@@ -295,13 +295,13 @@ public class HistoryWriter extends AbstractService {
}
}
- if (queryHistory.getSubQueryHistories() != null) {
- for (SubQueryHistory subQueryHistory : queryHistory.getSubQueryHistories()) {
- Path path = new Path(queryHistoryFile.getParent(), subQueryHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX);
+ if (queryHistory.getStageHistories() != null) {
+ for (StageHistory stageHistory : queryHistory.getStageHistories()) {
+ Path path = new Path(queryHistoryFile.getParent(), stageHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX);
out = null;
try {
out = fs.create(path);
- out.write(subQueryHistory.toTasksJson().getBytes());
+ out.write(stageHistory.toTasksJson().getBytes());
LOG.info("Saving query unit: " + path);
} finally {
if (out != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
index 7a81b4b..fdc45a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
@@ -20,8 +20,9 @@ package org.apache.tajo.util.history;
import com.google.gson.annotations.Expose;
import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
-import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto;
+import org.apache.tajo.ipc.ClientProtos.StageHistoryProto;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
@@ -42,7 +43,7 @@ public class QueryHistory implements GsonObject, History {
@Expose
private String distributedPlan;
@Expose
- private List<SubQueryHistory> subQueryHistories;
+ private List<StageHistory> stageHistories;
public String getQueryId() {
return queryId;
@@ -56,8 +57,8 @@ public class QueryHistory implements GsonObject, History {
this.queryMaster = queryMaster;
}
- public void setSubQueryHistories(List<SubQueryHistory> subQueryHistories) {
- this.subQueryHistories = subQueryHistories;
+ public void setStageHistories(List<StageHistory> stageHistories) {
+ this.stageHistories = stageHistories;
}
public String getQueryMaster() {
@@ -72,8 +73,8 @@ public class QueryHistory implements GsonObject, History {
this.httpPort = httpPort;
}
- public List<SubQueryHistory> getSubQueryHistories() {
- return subQueryHistories;
+ public List<StageHistory> getStageHistories() {
+ return stageHistories;
}
public List<String[]> getSessionVariables() {
@@ -138,13 +139,13 @@ public class QueryHistory implements GsonObject, History {
builder.addAllSessionVariables(sessionProtos);
- List<SubQueryHistoryProto> subQueryHistoryProtos = new ArrayList<SubQueryHistoryProto>();
- if (subQueryHistories != null) {
- for (SubQueryHistory eachSubQuery: subQueryHistories) {
- subQueryHistoryProtos.add((eachSubQuery.getProto()));
+ List<StageHistoryProto> stageHistoryProtos = new ArrayList<ClientProtos.StageHistoryProto>();
+ if (stageHistories != null) {
+ for (StageHistory eachStage: stageHistories) {
+ stageHistoryProtos.add((eachStage.getProto()));
}
}
- builder.addAllSubQueryHistories(subQueryHistoryProtos);
+ builder.addAllStageHistories(stageHistoryProtos);
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/util/history/StageHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/StageHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/StageHistory.java
new file mode 100644
index 0000000..e760f86
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/StageHistory.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util.history;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.reflect.TypeToken;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.ipc.ClientProtos.StageHistoryProto;
+import org.apache.tajo.json.GsonObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StageHistory implements GsonObject {
+ @Expose
+ private String executionBlockId;
+ @Expose
+ private String state;
+ @Expose
+ private long startTime;
+ @Expose
+ private long finishTime;
+ @Expose
+ private int succeededObjectCount;
+ @Expose
+ private int failedObjectCount;
+ @Expose
+ private int killedObjectCount;
+ @Expose
+ private int totalScheduledObjectsCount;
+
+ @Expose
+ private long totalInputBytes;
+ @Expose
+ private long totalReadBytes;
+ @Expose
+ private long totalReadRows;
+ @Expose
+ private long totalWriteBytes;
+ @Expose
+ private long totalWriteRows;
+ @Expose
+ private int numShuffles;
+ @Expose
+ private float progress;
+
+ @Expose
+ private String plan;
+ @Expose
+ private int hostLocalAssigned;
+ @Expose
+ private int rackLocalAssigned;
+
+ private List<TaskHistory> tasks;
+
+ public String getExecutionBlockId() {
+ return executionBlockId;
+ }
+
+ public void setExecutionBlockId(String executionBlockId) {
+ this.executionBlockId = executionBlockId;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public int getSucceededObjectCount() {
+ return succeededObjectCount;
+ }
+
+ public void setSucceededObjectCount(int succeededObjectCount) {
+ this.succeededObjectCount = succeededObjectCount;
+ }
+
+ public int getTotalScheduledObjectsCount() {
+ return totalScheduledObjectsCount;
+ }
+
+ public void setTotalScheduledObjectsCount(int totalScheduledObjectsCount) {
+ this.totalScheduledObjectsCount = totalScheduledObjectsCount;
+ }
+
+ public long getTotalInputBytes() {
+ return totalInputBytes;
+ }
+
+ public void setTotalInputBytes(long totalInputBytes) {
+ this.totalInputBytes = totalInputBytes;
+ }
+
+ public long getTotalReadBytes() {
+ return totalReadBytes;
+ }
+
+ public void setTotalReadBytes(long totalReadBytes) {
+ this.totalReadBytes = totalReadBytes;
+ }
+
+ public long getTotalReadRows() {
+ return totalReadRows;
+ }
+
+ public void setTotalReadRows(long totalReadRows) {
+ this.totalReadRows = totalReadRows;
+ }
+
+ public long getTotalWriteBytes() {
+ return totalWriteBytes;
+ }
+
+ public void setTotalWriteBytes(long totalWriteBytes) {
+ this.totalWriteBytes = totalWriteBytes;
+ }
+
+ public long getTotalWriteRows() {
+ return totalWriteRows;
+ }
+
+ public void setTotalWriteRows(long totalWriteRows) {
+ this.totalWriteRows = totalWriteRows;
+ }
+
+ public int getNumShuffles() {
+ return numShuffles;
+ }
+
+ public void setNumShuffles(int numShuffles) {
+ this.numShuffles = numShuffles;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public String getPlan() {
+ return plan;
+ }
+
+ public void setPlan(String plan) {
+ this.plan = plan;
+ }
+
+ public int getHostLocalAssigned() {
+ return hostLocalAssigned;
+ }
+
+ public void setHostLocalAssigned(int hostLocalAssigned) {
+ this.hostLocalAssigned = hostLocalAssigned;
+ }
+
+ public int getRackLocalAssigned() {
+ return rackLocalAssigned;
+ }
+
+ public void setRackLocalAssigned(int rackLocalAssigned) {
+ this.rackLocalAssigned = rackLocalAssigned;
+ }
+
+ public int getFailedObjectCount() {
+ return failedObjectCount;
+ }
+
+ public void setFailedObjectCount(int failedObjectCount) {
+ this.failedObjectCount = failedObjectCount;
+ }
+
+ public int getKilledObjectCount() {
+ return killedObjectCount;
+ }
+
+ public void setKilledObjectCount(int killedObjectCount) {
+ this.killedObjectCount = killedObjectCount;
+ }
+
+ public List<TaskHistory> getTasks() {
+ return tasks;
+ }
+
+ public void setTasks(List<TaskHistory> tasks) {
+ this.tasks = tasks;
+ }
+
+ @Override
+ public String toJson() {
+ return CoreGsonHelper.toJson(this, StageHistory.class);
+ }
+
+ public String toTasksJson() {
+ if (tasks == null) {
+ return "";
+ }
+ return CoreGsonHelper.getInstance().toJson(tasks, new TypeToken<List<TaskHistory>>() {
+ }.getType());
+ }
+
+ public static List<TaskHistory> fromJsonTasks(String json) {
+ if (json == null || json.trim().isEmpty()) {
+ return new ArrayList<TaskHistory>();
+ }
+ return CoreGsonHelper.getInstance().fromJson(json, new TypeToken<List<TaskHistory>>() {
+ }.getType());
+ }
+
+ public StageHistoryProto getProto() {
+ StageHistoryProto.Builder builder = StageHistoryProto.newBuilder();
+ builder.setExecutionBlockId(executionBlockId)
+ .setState(state)
+ .setStartTime(startTime)
+ .setFinishTime(finishTime)
+ .setSucceededObjectCount(succeededObjectCount)
+ .setFailedObjectCount(failedObjectCount)
+ .setKilledObjectCount(killedObjectCount)
+ .setTotalScheduledObjectsCount(totalScheduledObjectsCount)
+
+ .setTotalInputBytes(totalInputBytes)
+ .setTotalReadBytes(totalReadBytes)
+ .setTotalReadRows(totalReadRows)
+ .setTotalWriteBytes(totalWriteBytes)
+ .setTotalWriteRows(totalWriteRows)
+ .setNumShuffles(numShuffles)
+ .setProgress(progress)
+
+ .setPlan(plan)
+ .setHostLocalAssigned(hostLocalAssigned)
+ .setRackLocalAssigned(rackLocalAssigned);
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
deleted file mode 100644
index 0afdf5a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util.history;
-
-import com.google.gson.annotations.Expose;
-import com.google.gson.reflect.TypeToken;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto;
-import org.apache.tajo.json.GsonObject;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SubQueryHistory implements GsonObject {
- @Expose
- private String executionBlockId;
- @Expose
- private String state;
- @Expose
- private long startTime;
- @Expose
- private long finishTime;
- @Expose
- private int succeededObjectCount;
- @Expose
- private int failedObjectCount;
- @Expose
- private int killedObjectCount;
- @Expose
- private int totalScheduledObjectsCount;
-
- @Expose
- private long totalInputBytes;
- @Expose
- private long totalReadBytes;
- @Expose
- private long totalReadRows;
- @Expose
- private long totalWriteBytes;
- @Expose
- private long totalWriteRows;
- @Expose
- private int numShuffles;
- @Expose
- private float progress;
-
- @Expose
- private String plan;
- @Expose
- private int hostLocalAssigned;
- @Expose
- private int rackLocalAssigned;
-
- private List<TaskHistory> tasks;
-
- public String getExecutionBlockId() {
- return executionBlockId;
- }
-
- public void setExecutionBlockId(String executionBlockId) {
- this.executionBlockId = executionBlockId;
- }
-
- public String getState() {
- return state;
- }
-
- public void setState(String state) {
- this.state = state;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
- }
-
- public int getSucceededObjectCount() {
- return succeededObjectCount;
- }
-
- public void setSucceededObjectCount(int succeededObjectCount) {
- this.succeededObjectCount = succeededObjectCount;
- }
-
- public int getTotalScheduledObjectsCount() {
- return totalScheduledObjectsCount;
- }
-
- public void setTotalScheduledObjectsCount(int totalScheduledObjectsCount) {
- this.totalScheduledObjectsCount = totalScheduledObjectsCount;
- }
-
- public long getTotalInputBytes() {
- return totalInputBytes;
- }
-
- public void setTotalInputBytes(long totalInputBytes) {
- this.totalInputBytes = totalInputBytes;
- }
-
- public long getTotalReadBytes() {
- return totalReadBytes;
- }
-
- public void setTotalReadBytes(long totalReadBytes) {
- this.totalReadBytes = totalReadBytes;
- }
-
- public long getTotalReadRows() {
- return totalReadRows;
- }
-
- public void setTotalReadRows(long totalReadRows) {
- this.totalReadRows = totalReadRows;
- }
-
- public long getTotalWriteBytes() {
- return totalWriteBytes;
- }
-
- public void setTotalWriteBytes(long totalWriteBytes) {
- this.totalWriteBytes = totalWriteBytes;
- }
-
- public long getTotalWriteRows() {
- return totalWriteRows;
- }
-
- public void setTotalWriteRows(long totalWriteRows) {
- this.totalWriteRows = totalWriteRows;
- }
-
- public int getNumShuffles() {
- return numShuffles;
- }
-
- public void setNumShuffles(int numShuffles) {
- this.numShuffles = numShuffles;
- }
-
- public float getProgress() {
- return progress;
- }
-
- public void setProgress(float progress) {
- this.progress = progress;
- }
-
- public String getPlan() {
- return plan;
- }
-
- public void setPlan(String plan) {
- this.plan = plan;
- }
-
- public int getHostLocalAssigned() {
- return hostLocalAssigned;
- }
-
- public void setHostLocalAssigned(int hostLocalAssigned) {
- this.hostLocalAssigned = hostLocalAssigned;
- }
-
- public int getRackLocalAssigned() {
- return rackLocalAssigned;
- }
-
- public void setRackLocalAssigned(int rackLocalAssigned) {
- this.rackLocalAssigned = rackLocalAssigned;
- }
-
- public int getFailedObjectCount() {
- return failedObjectCount;
- }
-
- public void setFailedObjectCount(int failedObjectCount) {
- this.failedObjectCount = failedObjectCount;
- }
-
- public int getKilledObjectCount() {
- return killedObjectCount;
- }
-
- public void setKilledObjectCount(int killedObjectCount) {
- this.killedObjectCount = killedObjectCount;
- }
-
- public List<TaskHistory> getTasks() {
- return tasks;
- }
-
- public void setTasks(List<TaskHistory> tasks) {
- this.tasks = tasks;
- }
-
- @Override
- public String toJson() {
- return CoreGsonHelper.toJson(this, SubQueryHistory.class);
- }
-
- public String toTasksJson() {
- if (tasks == null) {
- return "";
- }
- return CoreGsonHelper.getInstance().toJson(tasks, new TypeToken<List<TaskHistory>>() {
- }.getType());
- }
-
- public static List<TaskHistory> fromJsonTasks(String json) {
- if (json == null || json.trim().isEmpty()) {
- return new ArrayList<TaskHistory>();
- }
- return CoreGsonHelper.getInstance().fromJson(json, new TypeToken<List<TaskHistory>>() {
- }.getType());
- }
-
- public SubQueryHistoryProto getProto() {
- SubQueryHistoryProto.Builder builder = SubQueryHistoryProto.newBuilder();
- builder.setExecutionBlockId(executionBlockId)
- .setState(state)
- .setStartTime(startTime)
- .setFinishTime(finishTime)
- .setSucceededObjectCount(succeededObjectCount)
- .setFailedObjectCount(failedObjectCount)
- .setKilledObjectCount(killedObjectCount)
- .setTotalScheduledObjectsCount(totalScheduledObjectsCount)
-
- .setTotalInputBytes(totalInputBytes)
- .setTotalReadBytes(totalReadBytes)
- .setTotalReadRows(totalReadRows)
- .setTotalWriteBytes(totalWriteBytes)
- .setTotalWriteRows(totalWriteRows)
- .setNumShuffles(numShuffles)
- .setProgress(progress)
-
- .setPlan(plan)
- .setHostLocalAssigned(hostLocalAssigned)
- .setRackLocalAssigned(rackLocalAssigned);
-
- return builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index f055733..8944eae 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -37,10 +37,10 @@ import org.apache.tajo.master.container.TajoContainer;
import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
-import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.event.StageContainerAllocationEvent;
import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.master.querymaster.SubQueryState;
+import org.apache.tajo.master.querymaster.Stage;
+import org.apache.tajo.master.querymaster.StageState;
import org.apache.tajo.master.rm.*;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
@@ -352,8 +352,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
containers.add(container);
}
- SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState();
- if (!SubQuery.isRunningState(state)) {
+ StageState state = queryTaskContext.getStage(executionBlockId).getSynchronizedState();
+ if (!Stage.isRunningState(state)) {
try {
List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>();
for(TajoContainer eachContainer: containers) {
@@ -368,9 +368,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
if (allocatedResources.size() > 0) {
if(LOG.isDebugEnabled()) {
- LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
+ LOG.debug("StageContainerAllocationEvent fire:" + executionBlockId);
}
- queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
+ queryTaskContext.getEventHandler().handle(new StageContainerAllocationEvent(executionBlockId, containers));
}
numAllocatedContainers += allocatedResources.size();