You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:17:26 UTC
[09/16] tajo git commit: TAJO-1288: Refactoring
org.apache.tajo.master package.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
deleted file mode 100644
index 0515e72..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java
+++ /dev/null
@@ -1,1342 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
-import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.*;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.master.querymaster.Task.IntermediateEntry;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.storage.FileStorageManager;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.util.history.StageHistory;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-
-
-/**
- * Stage plays a role in controlling an ExecutionBlock and is a finite state machine.
- */
-public class Stage implements EventHandler<StageEvent> {
-
- private static final Log LOG = LogFactory.getLog(Stage.class);
-
- private MasterPlan masterPlan;
- private ExecutionBlock block;
- private int priority;
- private Schema schema;
- private TableMeta meta;
- private TableStats resultStatistics;
- private TableStats inputStatistics;
- private EventHandler<Event> eventHandler;
- private AbstractTaskScheduler taskScheduler;
- private QueryMasterTask.QueryMasterTaskContext context;
- private final List<String> diagnostics = new ArrayList<String>();
- private StageState stageState;
-
- private long startTime;
- private long finishTime;
-
- volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>();
- volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId,
- TajoContainer>();
-
- private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
- private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
- private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
- private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
- private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
- new AllocatedContainersCancelTransition();
- private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition();
- private StateMachine<StageState, StageEventType, StageEvent> stateMachine;
-
- protected static final StateMachineFactory<Stage, StageState,
- StageEventType, StageEvent> stateMachineFactory =
- new StateMachineFactory <Stage, StageState,
- StageEventType, StageEvent> (StageState.NEW)
-
- // Transitions from NEW state
- .addTransition(StageState.NEW,
- EnumSet.of(StageState.INITED, StageState.ERROR, StageState.SUCCEEDED),
- StageEventType.SQ_INIT,
- new InitAndRequestContainer())
- .addTransition(StageState.NEW, StageState.NEW,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(StageState.NEW, StageState.KILLED,
- StageEventType.SQ_KILL)
- .addTransition(StageState.NEW, StageState.ERROR,
- StageEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from INITED state
- .addTransition(StageState.INITED, StageState.RUNNING,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINER_LAUNCH_TRANSITION)
- .addTransition(StageState.INITED, StageState.INITED,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(StageState.INITED, StageState.KILL_WAIT,
- StageEventType.SQ_KILL, new KillTasksTransition())
- .addTransition(StageState.INITED, StageState.ERROR,
- StageEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from RUNNING state
- .addTransition(StageState.RUNNING, StageState.RUNNING,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINER_LAUNCH_TRANSITION)
- .addTransition(StageState.RUNNING, StageState.RUNNING,
- StageEventType.SQ_TASK_COMPLETED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(StageState.RUNNING,
- EnumSet.of(StageState.SUCCEEDED, StageState.FAILED),
- StageEventType.SQ_STAGE_COMPLETED,
- STAGE_COMPLETED_TRANSITION)
- .addTransition(StageState.RUNNING, StageState.RUNNING,
- StageEventType.SQ_FAILED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(StageState.RUNNING, StageState.RUNNING,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(StageState.RUNNING, StageState.KILL_WAIT,
- StageEventType.SQ_KILL,
- new KillTasksTransition())
- .addTransition(StageState.RUNNING, StageState.ERROR,
- StageEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able Transition
- .addTransition(StageState.RUNNING, StageState.RUNNING,
- StageEventType.SQ_START)
-
- // Transitions from KILL_WAIT state
- .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
- EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition())
- .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
- StageEventType.SQ_TASK_COMPLETED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(StageState.KILL_WAIT,
- EnumSet.of(StageState.SUCCEEDED, StageState.FAILED, StageState.KILLED),
- StageEventType.SQ_STAGE_COMPLETED,
- STAGE_COMPLETED_TRANSITION)
- .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT,
- StageEventType.SQ_FAILED,
- TASK_COMPLETED_TRANSITION)
- .addTransition(StageState.KILL_WAIT, StageState.ERROR,
- StageEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from SUCCEEDED state
- .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(StageState.SUCCEEDED, StageState.ERROR,
- StageEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able events
- .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED,
- EnumSet.of(
- StageEventType.SQ_START,
- StageEventType.SQ_KILL,
- StageEventType.SQ_CONTAINER_ALLOCATED))
-
- // Transitions from KILLED state
- .addTransition(StageState.KILLED, StageState.KILLED,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(StageState.KILLED, StageState.KILLED,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(StageState.KILLED, StageState.ERROR,
- StageEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able transitions
- .addTransition(StageState.KILLED, StageState.KILLED,
- EnumSet.of(
- StageEventType.SQ_START,
- StageEventType.SQ_KILL,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- StageEventType.SQ_FAILED))
-
- // Transitions from FAILED state
- .addTransition(StageState.FAILED, StageState.FAILED,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(StageState.FAILED, StageState.FAILED,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(StageState.FAILED, StageState.ERROR,
- StageEventType.SQ_INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able transitions
- .addTransition(StageState.FAILED, StageState.FAILED,
- EnumSet.of(
- StageEventType.SQ_START,
- StageEventType.SQ_KILL,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- StageEventType.SQ_FAILED))
-
- // Transitions from ERROR state
- .addTransition(StageState.ERROR, StageState.ERROR,
- StageEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINERS_CANCEL_TRANSITION)
- .addTransition(StageState.ERROR, StageState.ERROR,
- StageEventType.SQ_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- // Ignore-able transitions
- .addTransition(StageState.ERROR, StageState.ERROR,
- EnumSet.of(
- StageEventType.SQ_START,
- StageEventType.SQ_KILL,
- StageEventType.SQ_FAILED,
- StageEventType.SQ_INTERNAL_ERROR,
- StageEventType.SQ_STAGE_COMPLETED))
-
- .installTopology();
-
- private final Lock readLock;
- private final Lock writeLock;
-
- private int totalScheduledObjectsCount;
- private int succeededObjectCount = 0;
- private int completedTaskCount = 0;
- private int succeededTaskCount = 0;
- private int killedObjectCount = 0;
- private int failedObjectCount = 0;
- private TaskSchedulerContext schedulerContext;
- private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
- private AtomicInteger completeReportReceived = new AtomicInteger(0);
- private StageHistory finalStageHistory;
-
- public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
- this.context = context;
- this.masterPlan = masterPlan;
- this.block = block;
- this.eventHandler = context.getEventHandler();
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
- stateMachine = stateMachineFactory.make(this);
- stageState = stateMachine.getCurrentState();
- }
-
- public static boolean isRunningState(StageState state) {
- return state == StageState.INITED || state == StageState.NEW || state == StageState.RUNNING;
- }
-
- public QueryMasterTask.QueryMasterTaskContext getContext() {
- return context;
- }
-
- public MasterPlan getMasterPlan() {
- return masterPlan;
- }
-
- public DataChannel getDataChannel() {
- return masterPlan.getOutgoingChannels(getId()).iterator().next();
- }
-
- public EventHandler<Event> getEventHandler() {
- return eventHandler;
- }
-
- public AbstractTaskScheduler getTaskScheduler() {
- return taskScheduler;
- }
-
- public void setStartTime() {
- startTime = context.getClock().getTime();
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public long getStartTime() {
- return this.startTime;
- }
-
- public void setFinishTime() {
- finishTime = context.getClock().getTime();
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public long getFinishTime() {
- return this.finishTime;
- }
-
- public float getTaskProgress() {
- readLock.lock();
- try {
- if (getState() == StageState.NEW) {
- return 0;
- } else {
- return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public float getProgress() {
- List<Task> tempTasks = null;
- readLock.lock();
- try {
- if (getState() == StageState.NEW) {
- return 0.0f;
- } else {
- tempTasks = new ArrayList<Task>(tasks.values());
- }
- } finally {
- readLock.unlock();
- }
-
- float totalProgress = 0.0f;
- for (Task eachTask : tempTasks) {
- if (eachTask.getLastAttempt() != null) {
- totalProgress += eachTask.getLastAttempt().getProgress();
- }
- }
-
- if (totalProgress > 0.0f) {
- return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f;
- } else {
- return 0.0f;
- }
- }
-
- public int getSucceededObjectCount() {
- return succeededObjectCount;
- }
-
- public int getTotalScheduledObjectsCount() {
- return totalScheduledObjectsCount;
- }
-
- public ExecutionBlock getBlock() {
- return block;
- }
-
- public void addTask(Task task) {
- tasks.put(task.getId(), task);
- }
-
- public StageHistory getStageHistory() {
- if (finalStageHistory != null) {
- if (finalStageHistory.getFinishTime() == 0) {
- finalStageHistory = makeStageHistory();
- finalStageHistory.setTasks(makeTaskHistories());
- }
- return finalStageHistory;
- } else {
- return makeStageHistory();
- }
- }
-
- private List<TaskHistory> makeTaskHistories() {
- List<TaskHistory> taskHistories = new ArrayList<TaskHistory>();
-
- for(Task eachTask : getTasks()) {
- taskHistories.add(eachTask.getTaskHistory());
- }
-
- return taskHistories;
- }
-
- private StageHistory makeStageHistory() {
- StageHistory stageHistory = new StageHistory();
-
- stageHistory.setExecutionBlockId(getId().toString());
- stageHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan()));
- stageHistory.setState(getState().toString());
- stageHistory.setStartTime(startTime);
- stageHistory.setFinishTime(finishTime);
- stageHistory.setSucceededObjectCount(succeededObjectCount);
- stageHistory.setKilledObjectCount(killedObjectCount);
- stageHistory.setFailedObjectCount(failedObjectCount);
- stageHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount);
- stageHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned());
- stageHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned());
-
- long totalInputBytes = 0;
- long totalReadBytes = 0;
- long totalReadRows = 0;
- long totalWriteBytes = 0;
- long totalWriteRows = 0;
- int numShuffles = 0;
- for(Task eachTask : getTasks()) {
- numShuffles = eachTask.getShuffleOutpuNum();
- if (eachTask.getLastAttempt() != null) {
- TableStats inputStats = eachTask.getLastAttempt().getInputStats();
- if (inputStats != null) {
- totalInputBytes += inputStats.getNumBytes();
- totalReadBytes += inputStats.getReadBytes();
- totalReadRows += inputStats.getNumRows();
- }
- TableStats outputStats = eachTask.getLastAttempt().getResultStats();
- if (outputStats != null) {
- totalWriteBytes += outputStats.getNumBytes();
- totalWriteRows += outputStats.getNumRows();
- }
- }
- }
-
- stageHistory.setTotalInputBytes(totalInputBytes);
- stageHistory.setTotalReadBytes(totalReadBytes);
- stageHistory.setTotalReadRows(totalReadRows);
- stageHistory.setTotalWriteBytes(totalWriteBytes);
- stageHistory.setTotalWriteRows(totalWriteRows);
- stageHistory.setNumShuffles(numShuffles);
- stageHistory.setProgress(getProgress());
- return stageHistory;
- }
-
- /**
- * It finalizes this stage. It is only invoked when the stage is succeeded.
- */
- public void complete() {
- cleanup();
- finalizeStats();
- setFinishTime();
- eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED));
- }
-
- /**
- * It finalizes this stage. Unlike {@link Stage#complete()},
- * it is invoked when a stage is abnormally finished.
- *
- * @param finalState The final stage state
- */
- public void abort(StageState finalState) {
- // TODO -
- // - committer.abortStage(...)
- // - record Stage Finish Time
- // - CleanUp Tasks
- // - Record History
- cleanup();
- setFinishTime();
- eventHandler.handle(new StageCompletedEvent(getId(), finalState));
- }
-
- public StateMachine<StageState, StageEventType, StageEvent> getStateMachine() {
- return this.stateMachine;
- }
-
- public void setPriority(int priority) {
- this.priority = priority;
- }
-
-
- public int getPriority() {
- return this.priority;
- }
-
- public ExecutionBlockId getId() {
- return block.getId();
- }
-
- public Task[] getTasks() {
- return tasks.values().toArray(new Task[tasks.size()]);
- }
-
- public Task getTask(TaskId qid) {
- return tasks.get(qid);
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public TableMeta getTableMeta() {
- return meta;
- }
-
- public TableStats getResultStats() {
- return resultStatistics;
- }
-
- public TableStats getInputStats() {
- return inputStatistics;
- }
-
- public List<String> getDiagnostics() {
- readLock.lock();
- try {
- return diagnostics;
- } finally {
- readLock.unlock();
- }
- }
-
- protected void addDiagnostic(String diag) {
- diagnostics.add(diag);
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(this.getId());
- return sb.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof Stage) {
- Stage other = (Stage)o;
- return getId().equals(other.getId());
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return getId().hashCode();
- }
-
- public int compareTo(Stage other) {
- return getId().compareTo(other.getId());
- }
-
- public StageState getSynchronizedState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- /* non-blocking call for client API */
- public StageState getState() {
- return stageState;
- }
-
- public static TableStats[] computeStatFromUnionBlock(Stage stage) {
- TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
- long[] avgRows = new long[]{0, 0};
- long[] numBytes = new long[]{0, 0};
- long[] readBytes = new long[]{0, 0};
- long[] numRows = new long[]{0, 0};
- int[] numBlocks = new int[]{0, 0};
- int[] numOutputs = new int[]{0, 0};
-
- List<ColumnStats> columnStatses = Lists.newArrayList();
-
- MasterPlan masterPlan = stage.getMasterPlan();
- Iterator<ExecutionBlock> it = masterPlan.getChilds(stage.getBlock()).iterator();
- while (it.hasNext()) {
- ExecutionBlock block = it.next();
- Stage childStage = stage.context.getStage(block.getId());
- TableStats[] childStatArray = new TableStats[]{
- childStage.getInputStats(), childStage.getResultStats()
- };
- for (int i = 0; i < 2; i++) {
- if (childStatArray[i] == null) {
- continue;
- }
- avgRows[i] += childStatArray[i].getAvgRows();
- numBlocks[i] += childStatArray[i].getNumBlocks();
- numBytes[i] += childStatArray[i].getNumBytes();
- readBytes[i] += childStatArray[i].getReadBytes();
- numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
- numRows[i] += childStatArray[i].getNumRows();
- }
- columnStatses.addAll(childStatArray[1].getColumnStats());
- }
-
- for (int i = 0; i < 2; i++) {
- stat[i].setNumBlocks(numBlocks[i]);
- stat[i].setNumBytes(numBytes[i]);
- stat[i].setReadBytes(readBytes[i]);
- stat[i].setNumShuffleOutputs(numOutputs[i]);
- stat[i].setNumRows(numRows[i]);
- stat[i].setAvgRows(avgRows[i]);
- }
- stat[1].setColumnStats(columnStatses);
-
- return stat;
- }
-
- private TableStats[] computeStatFromTasks() {
- List<TableStats> inputStatsList = Lists.newArrayList();
- List<TableStats> resultStatsList = Lists.newArrayList();
- for (Task unit : getTasks()) {
- resultStatsList.add(unit.getStats());
- if (unit.getLastAttempt().getInputStats() != null) {
- inputStatsList.add(unit.getLastAttempt().getInputStats());
- }
- }
- TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
- TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
- return new TableStats[]{inputStats, resultStats};
- }
-
- private void stopScheduler() {
- // If there are launched TaskRunners, send the 'shouldDie' message to all r
- // via received task requests.
- if (taskScheduler != null) {
- taskScheduler.stop();
- }
- }
-
- private void releaseContainers() {
- // If there are still live TaskRunners, try to kill the containers.
- eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
- }
-
- /**
- * It computes all stats and sets the intermediate result.
- */
- private void finalizeStats() {
- TableStats[] statsArray;
- if (block.hasUnion()) {
- statsArray = computeStatFromUnionBlock(this);
- } else {
- statsArray = computeStatFromTasks();
- }
-
- DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
-
- // if store plan (i.e., CREATE or INSERT OVERWRITE)
- StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
- if (storeType == null) {
- // get default or store type
- storeType = StoreType.CSV;
- }
-
- schema = channel.getSchema();
- meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet());
- inputStatistics = statsArray[0];
- resultStatistics = statsArray[1];
- }
-
- @Override
- public void handle(StageEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState="
- + getSynchronizedState());
- }
-
- try {
- writeLock.lock();
- StageState oldState = getSynchronizedState();
- try {
- getStateMachine().doTransition(event.getType(), event);
- stageState = getSynchronizedState();
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state"
- + ", eventType:" + event.getType().name()
- + ", oldState:" + oldState.name()
- + ", nextState:" + getSynchronizedState().name()
- , e);
- eventHandler.handle(new StageEvent(getId(),
- StageEventType.SQ_INTERNAL_ERROR));
- }
-
- // notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getSynchronizedState()) {
- LOG.debug(getId() + " Stage Transitioned from " + oldState + " to "
- + getSynchronizedState());
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public void handleTaskRequestEvent(TaskRequestEvent event) {
- taskScheduler.handleTaskRequestEvent(event);
- }
-
- private static class InitAndRequestContainer implements MultipleArcTransition<Stage,
- StageEvent, StageState> {
-
- @Override
- public StageState transition(final Stage stage, StageEvent stageEvent) {
- stage.setStartTime();
- ExecutionBlock execBlock = stage.getBlock();
- StageState state;
-
- try {
- // Union operator does not require actual query processing. It is performed logically.
- if (execBlock.hasUnion()) {
- stage.finalizeStats();
- state = StageState.SUCCEEDED;
- } else {
- ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
- DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
- setShuffleIfNecessary(stage, channel);
- initTaskScheduler(stage);
- // execute pre-processing asyncronously
- stage.getContext().getQueryMasterContext().getEventExecutor()
- .submit(new Runnable() {
- @Override
- public void run() {
- try {
- schedule(stage);
- stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum();
- LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled");
-
- if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
- stage.complete();
- } else {
- if(stage.getSynchronizedState() == StageState.INITED) {
- stage.taskScheduler.start();
- allocateContainers(stage);
- } else {
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
- }
- }
- } catch (Throwable e) {
- LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
- stage.setFinishTime();
- stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
- stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
- }
- }
- }
- );
- state = StageState.INITED;
- }
- } catch (Throwable e) {
- LOG.error("Stage (" + stage.getId() + ") ERROR: ", e);
- stage.setFinishTime();
- stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage()));
- stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR));
- return StageState.ERROR;
- }
-
- return state;
- }
-
- private void initTaskScheduler(Stage stage) throws IOException {
- TajoConf conf = stage.context.getConf();
- stage.schedulerContext = new TaskSchedulerContext(stage.context,
- stage.getMasterPlan().isLeaf(stage.getId()), stage.getId());
- stage.taskScheduler = TaskSchedulerFactory.get(conf, stage.schedulerContext, stage);
- stage.taskScheduler.init(conf);
- LOG.info(stage.taskScheduler.getName() + " is chosen for the task scheduling for " + stage.getId());
- }
-
- /**
- * If a parent block requires a repartition operation, the method sets proper repartition
- * methods and the number of partitions to a given Stage.
- */
- private static void setShuffleIfNecessary(Stage stage, DataChannel channel) {
- if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
- int numTasks = calculateShuffleOutputNum(stage, channel);
- Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel);
- }
- }
-
- /**
- * Getting the total memory of cluster
- *
- * @param stage
- * @return mega bytes
- */
- private static int getClusterTotalMemory(Stage stage) {
- List<TajoMasterProtocol.WorkerResourceProto> workers =
- stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
- int totalMem = 0;
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
- totalMem += worker.getMemoryMB();
- }
- return totalMem;
- }
- /**
- * Getting the desire number of partitions according to the volume of input data.
- * This method is only used to determine the partition key number of hash join or aggregation.
- *
- * @param stage
- * @return
- */
- public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) {
- TajoConf conf = stage.context.getConf();
- MasterPlan masterPlan = stage.getMasterPlan();
- ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
-
- LogicalNode grpNode = null;
- if (parent != null) {
- grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
- if (grpNode == null) {
- grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
- }
- }
-
- // We assume this execution block the first stage of join if two or more tables are included in this block,
- if (parent != null && parent.getScanNodes().length >= 2) {
- List<ExecutionBlock> childs = masterPlan.getChilds(parent);
-
- // for outer
- ExecutionBlock outer = childs.get(0);
- long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer);
-
- // for inner
- ExecutionBlock inner = childs.get(1);
- long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner);
- LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
- + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
-
- long bigger = Math.max(outerVolume, innerVolume);
-
- int mb = (int) Math.ceil((double) bigger / 1048576);
- LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
-
- int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
-
- if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
- taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
- LOG.warn("!!!!! TESTCASE MODE !!!!!");
- }
-
- // The shuffle output numbers of join may be inconsistent by execution block order.
- // Thus, we need to compare the number with DataChannel output numbers.
- // If the number is right, the number and DataChannel output numbers will be consistent.
- int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0;
- for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
- outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum());
- }
- for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
- innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
- }
- if (outerShuffleOutputNum != innerShuffleOutputNum
- && taskNum != outerShuffleOutputNum
- && taskNum != innerShuffleOutputNum) {
- LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" +
- ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) +
- ", outerShuffleOutptNum=" + outerShuffleOutputNum +
- ", innerShuffleOutputNum=" + innerShuffleOutputNum);
- taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum);
- }
-
- LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum);
-
- return taskNum;
- // Is this stage the first step of group-by?
- } else if (grpNode != null) {
- boolean hasGroupColumns = true;
- if (grpNode.getType() == NodeType.GROUP_BY) {
- hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
- } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
- // Find current distinct stage node.
- DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
- if (distinctNode == null) {
- LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
- distinctNode = (DistinctGroupbyNode)grpNode;
- }
- hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
-
- Enforcer enforcer = stage.getBlock().getEnforcer();
- if (enforcer == null) {
- LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null.");
- }
- EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
- if (property != null) {
- if (property.getDistinct().getIsMultipleAggregation()) {
- MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage();
- if (multiAggStage != MultipleAggregationStage.THRID_STAGE) {
- hasGroupColumns = true;
- }
- }
- }
- }
- if (!hasGroupColumns) {
- LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
- return 1;
- } else {
- long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
-
- int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB);
- LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
- // determine the number of task
- int taskNum = (int) Math.ceil((double) volumeByMB /
- masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
- LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum);
- return taskNum;
- }
- } else {
- LOG.info("============>>>>> Unexpected Case! <<<<<================");
- long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
- // determine the number of task per 128MB
- int taskNum = (int) Math.ceil((double)mb / 128);
- LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum);
- return taskNum;
- }
- }
-
- private static void schedule(Stage stage) throws IOException {
- MasterPlan masterPlan = stage.getMasterPlan();
- ExecutionBlock execBlock = stage.getBlock();
- if (stage.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
- scheduleFragmentsForLeafQuery(stage);
- } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
- Repartitioner.scheduleFragmentsForJoinQuery(stage.schedulerContext, stage);
- } else { // Case 3: Others (Sort or Aggregation)
- int numTasks = getNonLeafTaskNum(stage);
- Repartitioner.scheduleFragmentsForNonLeafTasks(stage.schedulerContext, masterPlan, stage, numTasks);
- }
- }
-
- /**
- * Getting the desire number of tasks according to the volume of input data
- *
- * @param stage
- * @return
- */
- public static int getNonLeafTaskNum(Stage stage) {
- // Getting intermediate data size
- long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock());
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
- // determine the number of task per 64MB
- int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
- LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum);
- return maxTaskNum;
- }
-
- public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
- ExecutionBlock execBlock) {
- Map<String, TableDesc> tableMap = context.getTableDescMap();
- if (masterPlan.isLeaf(execBlock)) {
- ScanNode[] outerScans = execBlock.getScanNodes();
- long maxVolume = 0;
- for (ScanNode eachScanNode: outerScans) {
- TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
- if (stat.getNumBytes() > maxVolume) {
- maxVolume = stat.getNumBytes();
- }
- }
- return maxVolume;
- } else {
- long aggregatedVolume = 0;
- for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
- Stage stage = context.getStage(childBlock.getId());
- if (stage == null || stage.getSynchronizedState() != StageState.SUCCEEDED) {
- aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
- } else {
- aggregatedVolume += stage.getResultStats().getNumBytes();
- }
- }
-
- return aggregatedVolume;
- }
- }
-
- public static void allocateContainers(Stage stage) {
- ExecutionBlock execBlock = stage.getBlock();
-
- //TODO consider disk slot
- int requiredMemoryMBPerTask = 512;
-
- int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers(
- stage.getContext().getQueryMasterContext().getWorkerContext(),
- stage.schedulerContext.getEstimatedTaskNum(),
- requiredMemoryMBPerTask
- );
-
- final Resource resource = Records.newRecord(Resource.class);
-
- resource.setMemory(requiredMemoryMBPerTask);
-
- LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest);
-
- Priority priority = Records.newRecord(Priority.class);
- priority.setPriority(stage.getPriority());
- ContainerAllocationEvent event =
- new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
- stage.getId(), priority, resource, numRequest,
- stage.masterPlan.isLeaf(execBlock), 0.0f);
- stage.eventHandler.handle(event);
- }
-
- private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException {
- ExecutionBlock execBlock = stage.getBlock();
- ScanNode[] scans = execBlock.getScanNodes();
- Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
- ScanNode scan = scans[0];
- TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName());
-
- Collection<Fragment> fragments;
- TableMeta meta = table.getMeta();
-
- // Depending on scanner node's type, it creates fragments. If scan is for
- // a partitioned table, It will creates lots fragments for all partitions.
- // Otherwise, it creates at least one fragments for a table, which may
- // span a number of blocks or possibly consists of a number of files.
- if (scan.getType() == NodeType.PARTITIONS_SCAN) {
- // After calling this method, partition paths are removed from the physical plan.
- FileStorageManager storageManager =
- (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf());
- fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
- } else {
- StorageManager storageManager =
- StorageManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType());
- fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan);
- }
-
- Stage.scheduleFragments(stage, fragments);
- if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) {
- //Leaf task of DefaultTaskScheduler should be fragment size
- // EstimatedTaskNum determined number of initial container
- stage.schedulerContext.setEstimatedTaskNum(fragments.size());
- } else {
- TajoConf conf = stage.context.getConf();
- stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
- int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
- (double) stage.schedulerContext.getTaskSize());
- stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
- }
- }
- }
-
- public static void scheduleFragment(Stage stage, Fragment fragment) {
- stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
- stage.getId(), fragment));
- }
-
-
- public static void scheduleFragments(Stage stage, Collection<Fragment> fragments) {
- for (Fragment eachFragment : fragments) {
- scheduleFragment(stage, eachFragment);
- }
- }
-
- public static void scheduleFragments(Stage stage, Collection<Fragment> leftFragments,
- Collection<Fragment> broadcastFragments) {
- for (Fragment eachLeafFragment : leftFragments) {
- scheduleFragment(stage, eachLeafFragment, broadcastFragments);
- }
- }
-
- public static void scheduleFragment(Stage stage,
- Fragment leftFragment, Collection<Fragment> rightFragments) {
- stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
- stage.getId(), leftFragment, rightFragments));
- }
-
- public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) {
- stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
- stage.getId(), fetches));
- }
-
- public static Task newEmptyTask(TaskSchedulerContext schedulerContext,
- TaskAttemptScheduleContext taskContext,
- Stage stage, int taskId) {
- ExecutionBlock execBlock = stage.getBlock();
- Task unit = new Task(schedulerContext.getMasterContext().getConf(),
- taskContext,
- QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId),
- schedulerContext.isLeafQuery(), stage.eventHandler);
- unit.setLogicalPlan(execBlock.getPlan());
- stage.addTask(unit);
- return unit;
- }
-
- private static class ContainerLaunchTransition
- implements SingleArcTransition<Stage, StageEvent> {
-
- @Override
- public void transition(Stage stage, StageEvent event) {
- try {
- StageContainerAllocationEvent allocationEvent =
- (StageContainerAllocationEvent) event;
- for (TajoContainer container : allocationEvent.getAllocatedContainer()) {
- TajoContainerId cId = container.getId();
- if (stage.containers.containsKey(cId)) {
- stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
- "Duplicated containers are allocated: " + cId.toString()));
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
- }
- stage.containers.put(cId, container);
- }
- LOG.info("Stage (" + stage.getId() + ") has " + stage.containers.size() + " containers!");
- stage.eventHandler.handle(
- new LaunchTaskRunnersEvent(stage.getId(), allocationEvent.getAllocatedContainer(),
- stage.getContext().getQueryContext(),
- CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class))
- );
-
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START));
- } catch (Throwable t) {
- stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
- ExceptionUtils.getStackTrace(t)));
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
- }
- }
- }
-
- /**
- * It is used in KILL_WAIT state against Contained Allocated event.
- * It just returns allocated containers to resource manager.
- */
- private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> {
- @Override
- public void transition(Stage stage, StageEvent event) {
- try {
- StageContainerAllocationEvent allocationEvent =
- (StageContainerAllocationEvent) event;
- stage.eventHandler.handle(
- new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
- stage.getId(), allocationEvent.getAllocatedContainer()));
- LOG.info(String.format("[%s] %d allocated containers are canceled",
- stage.getId().toString(),
- allocationEvent.getAllocatedContainer().size()));
- } catch (Throwable t) {
- stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(),
- ExceptionUtils.getStackTrace(t)));
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR));
- }
- }
- }
-
- private static class TaskCompletedTransition implements SingleArcTransition<Stage, StageEvent> {
-
- @Override
- public void transition(Stage stage,
- StageEvent event) {
- StageTaskEvent taskEvent = (StageTaskEvent) event;
- Task task = stage.getTask(taskEvent.getTaskId());
-
- if (task == null) { // task failed
- LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED));
- } else {
- stage.completedTaskCount++;
-
- if (taskEvent.getState() == TaskState.SUCCEEDED) {
- stage.succeededObjectCount++;
- } else if (task.getState() == TaskState.KILLED) {
- stage.killedObjectCount++;
- } else if (task.getState() == TaskState.FAILED) {
- stage.failedObjectCount++;
- // if at least one task is failed, try to kill all tasks.
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
- }
-
- LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)",
- stage.getId(),
- stage.getTotalScheduledObjectsCount(),
- stage.succeededObjectCount,
- stage.killedObjectCount,
- stage.failedObjectCount));
-
- if (stage.totalScheduledObjectsCount ==
- stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) {
- stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED));
- }
- }
- }
- }
-
- private static class KillTasksTransition implements SingleArcTransition<Stage, StageEvent> {
-
- @Override
- public void transition(Stage stage, StageEvent stageEvent) {
- if(stage.getTaskScheduler() != null){
- stage.getTaskScheduler().stop();
- }
-
- for (Task task : stage.getTasks()) {
- stage.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL));
- }
- }
- }
-
- private void cleanup() {
- stopScheduler();
- releaseContainers();
-
- if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
- List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
- List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
-
- for (ExecutionBlock executionBlock : childs) {
- ebIds.add(executionBlock.getId().getProto());
- }
-
- getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds);
- }
-
- this.finalStageHistory = makeStageHistory();
- this.finalStageHistory.setTasks(makeTaskHistories());
- }
-
- public List<IntermediateEntry> getHashShuffleIntermediateEntries() {
- return hashShuffleIntermediateEntries;
- }
-
- protected void waitingIntermediateReport() {
- LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get());
- synchronized(completeReportReceived) {
- long startTime = System.currentTimeMillis();
- while (true) {
- if (completeReportReceived.get() >= tasks.size()) {
- LOG.info(getId() + ", completed waiting IntermediateReport");
- return;
- } else {
- try {
- completeReportReceived.wait(10 * 1000);
- } catch (InterruptedException e) {
- }
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (elapsedTime >= 120 * 1000) {
- LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms");
- abort(StageState.FAILED);
- return;
- }
- }
- }
- }
- }
-
- public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) {
- LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks());
- if (!report.getReportSuccess()) {
- LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage());
- abort(StageState.FAILED);
- return;
- }
- if (report.getIntermediateEntriesCount() > 0) {
- synchronized (hashShuffleIntermediateEntries) {
- for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) {
- hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm));
- }
- }
- }
- synchronized(completeReportReceived) {
- completeReportReceived.addAndGet(report.getSucceededTasks());
- completeReportReceived.notifyAll();
- }
- }
-
- private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> {
-
- @Override
- public StageState transition(Stage stage, StageEvent stageEvent) {
- // TODO - Commit Stage
- // TODO - records succeeded, failed, killed completed task
- // TODO - records metrics
- try {
- LOG.info(String.format("Stage completed - %s (total=%d, success=%d, killed=%d)",
- stage.getId().toString(),
- stage.getTotalScheduledObjectsCount(),
- stage.getSucceededObjectCount(),
- stage.killedObjectCount));
-
- if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) {
- if (stage.failedObjectCount > 0) {
- stage.abort(StageState.FAILED);
- return StageState.FAILED;
- } else if (stage.killedObjectCount > 0) {
- stage.abort(StageState.KILLED);
- return StageState.KILLED;
- } else {
- LOG.error("Invalid State " + stage.getSynchronizedState() + " State");
- stage.abort(StageState.ERROR);
- return StageState.ERROR;
- }
- } else {
- stage.complete();
- return StageState.SUCCEEDED;
- }
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- stage.abort(StageState.ERROR);
- return StageState.ERROR;
- }
- }
- }
-
- private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> {
- @Override
- public void transition(Stage stage, StageEvent event) {
- stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
- }
- }
-
- private static class InternalErrorTransition implements SingleArcTransition<Stage, StageEvent> {
- @Override
- public void transition(Stage stage, StageEvent stageEvent) {
- stage.abort(StageState.ERROR);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
deleted file mode 100644
index 82a06fe..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-public enum StageState {
- NEW,
- INITED,
- RUNNING,
- SUCCEEDED,
- FAILED,
- KILL_WAIT,
- KILLED,
- ERROR
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
deleted file mode 100644
index 5475791..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java
+++ /dev/null
@@ -1,907 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.TaskId;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto;
-import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto;
-import org.apache.tajo.master.FragmentPair;
-import org.apache.tajo.master.TaskState;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.DataLocation;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TajoIdUtils;
-import org.apache.tajo.util.history.TaskHistory;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput;
-
-public class Task implements EventHandler<TaskEvent> {
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(Task.class);
-
- private final Configuration systemConf;
- private TaskId taskId;
- private EventHandler eventHandler;
- private StoreTableNode store = null;
- private LogicalNode plan = null;
- private List<ScanNode> scan;
-
- private Map<String, Set<FragmentProto>> fragMap;
- private Map<String, Set<FetchImpl>> fetchMap;
-
- private int totalFragmentNum;
-
- private List<ShuffleFileOutput> shuffleFileOutputs;
- private TableStats stats;
- private final boolean isLeafTask;
- private List<IntermediateEntry> intermediateData;
-
- private Map<TaskAttemptId, TaskAttempt> attempts;
- private final int maxAttempts = 3;
- private Integer nextAttempt = -1;
- private TaskAttemptId lastAttemptId;
-
- private TaskAttemptId successfulAttempt;
- private String succeededHost;
- private int succeededHostPort;
- private int succeededPullServerPort;
-
- private int failedAttempts;
- private int finishedAttempts; // finish are total of success, failed and killed
-
- private long launchTime;
- private long finishTime;
-
- private List<DataLocation> dataLocations = Lists.newArrayList();
-
- private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
-
- private TaskHistory finalTaskHistory;
-
- protected static final StateMachineFactory
- <Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
- new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
- // Transitions from NEW state
- .addTransition(TaskState.NEW, TaskState.SCHEDULED,
- TaskEventType.T_SCHEDULE,
- new InitialScheduleTransition())
- .addTransition(TaskState.NEW, TaskState.KILLED,
- TaskEventType.T_KILL,
- new KillNewTaskTransition())
-
- // Transitions from SCHEDULED state
- .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED,
- new AttemptLaunchedTransition())
- .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
- TaskEventType.T_KILL,
- new KillTaskTransition())
-
- // Transitions from RUNNING state
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
- .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
- TaskEventType.T_ATTEMPT_SUCCEEDED,
- new AttemptSucceededTransition())
- .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
- TaskEventType.T_KILL,
- new KillTaskTransition())
- .addTransition(TaskState.RUNNING,
- EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
- TaskEventType.T_ATTEMPT_FAILED,
- new AttemptFailedOrRetryTransition())
-
- // Transitions from KILL_WAIT state
- .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
- TaskEventType.T_ATTEMPT_KILLED,
- ATTEMPT_KILLED_TRANSITION)
- .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
- TaskEventType.T_ATTEMPT_LAUNCHED,
- new KillTaskTransition())
- .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
- TaskEventType.T_ATTEMPT_FAILED,
- new AttemptFailedTransition())
- .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
- TaskEventType.T_ATTEMPT_SUCCEEDED,
- ATTEMPT_KILLED_TRANSITION)
- // Ignore-able transitions.
- .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
- EnumSet.of(
- TaskEventType.T_KILL,
- TaskEventType.T_SCHEDULE))
-
- // Transitions from SUCCEEDED state
- // Ignore-able transitions
- .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
- EnumSet.of(TaskEventType.T_KILL,
- TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
- // Transitions from FAILED state
- // Ignore-able transitions
- .addTransition(TaskState.FAILED, TaskState.FAILED,
- EnumSet.of(TaskEventType.T_KILL,
- TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
- // Transitions from KILLED state
- .addTransition(TaskState.KILLED, TaskState.KILLED, TaskEventType.T_ATTEMPT_KILLED, new KillTaskTransition())
- // Ignore-able transitions
- .addTransition(TaskState.KILLED, TaskState.KILLED,
- EnumSet.of(
- TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED))
-
- .installTopology();
-
- private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
-
-
- private final Lock readLock;
- private final Lock writeLock;
- private TaskAttemptScheduleContext scheduleContext;
-
- public Task(Configuration conf, TaskAttemptScheduleContext scheduleContext,
- TaskId id, boolean isLeafTask, EventHandler eventHandler) {
- this.systemConf = conf;
- this.taskId = id;
- this.eventHandler = eventHandler;
- this.isLeafTask = isLeafTask;
- scan = new ArrayList<ScanNode>();
- fetchMap = Maps.newHashMap();
- fragMap = Maps.newHashMap();
- shuffleFileOutputs = new ArrayList<ShuffleFileOutput>();
- attempts = Collections.emptyMap();
- lastAttemptId = null;
- nextAttempt = -1;
- failedAttempts = 0;
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
- this.scheduleContext = scheduleContext;
-
- stateMachine = stateMachineFactory.make(this);
- totalFragmentNum = 0;
- }
-
- public boolean isLeafTask() {
- return this.isLeafTask;
- }
-
- public TaskState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- public TaskAttemptState getLastAttemptStatus() {
- TaskAttempt lastAttempt = getLastAttempt();
- if (lastAttempt != null) {
- return lastAttempt.getState();
- } else {
- return TaskAttemptState.TA_ASSIGNED;
- }
- }
-
- public TaskHistory getTaskHistory() {
- if (finalTaskHistory != null) {
- if (finalTaskHistory.getFinishTime() == 0) {
- finalTaskHistory = makeTaskHistory();
- }
- return finalTaskHistory;
- } else {
- return makeTaskHistory();
- }
- }
-
- private TaskHistory makeTaskHistory() {
- TaskHistory taskHistory = new TaskHistory();
-
- TaskAttempt lastAttempt = getLastAttempt();
- if (lastAttempt != null) {
- taskHistory.setId(lastAttempt.getId().toString());
- taskHistory.setState(lastAttempt.getState().toString());
- taskHistory.setProgress(lastAttempt.getProgress());
- }
- taskHistory.setHostAndPort(succeededHost + ":" + succeededHostPort);
- taskHistory.setRetryCount(this.getRetryCount());
- taskHistory.setLaunchTime(launchTime);
- taskHistory.setFinishTime(finishTime);
-
- taskHistory.setNumShuffles(getShuffleOutpuNum());
- if (!getShuffleFileOutputs().isEmpty()) {
- ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0);
- if (taskHistory.getNumShuffles() > 0) {
- taskHistory.setShuffleKey("" + shuffleFileOutputs.getPartId());
- taskHistory.setShuffleFileName(shuffleFileOutputs.getFileName());
- }
- }
-
- List<String> fragmentList = new ArrayList<String>();
- for (FragmentProto eachFragment : getAllFragments()) {
- try {
- Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment);
- fragmentList.add(fragment.toString());
- } catch (Exception e) {
- LOG.error(e.getMessage());
- fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage());
- }
- }
- taskHistory.setFragments(fragmentList.toArray(new String[]{}));
-
- List<String[]> fetchList = new ArrayList<String[]>();
- for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) {
- for (FetchImpl f : e.getValue()) {
- for (URI uri : f.getSimpleURIs()){
- fetchList.add(new String[] {e.getKey(), uri.toString()});
- }
- }
- }
-
- taskHistory.setFetchs(fetchList.toArray(new String[][]{}));
-
- List<String> dataLocationList = new ArrayList<String>();
- for(DataLocation eachLocation: getDataLocations()) {
- dataLocationList.add(eachLocation.toString());
- }
-
- taskHistory.setDataLocations(dataLocationList.toArray(new String[]{}));
- return taskHistory;
- }
-
- public void setLogicalPlan(LogicalNode plan) {
- this.plan = plan;
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getChild());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- s.add(s.size(), binary.getLeftChild());
- s.add(s.size(), binary.getRightChild());
- } else if (node instanceof ScanNode) {
- scan.add((ScanNode)node);
- } else if (node instanceof TableSubQueryNode) {
- s.add(((TableSubQueryNode) node).getSubQuery());
- }
- }
- }
-
- private void addDataLocation(Fragment fragment) {
- String[] hosts = fragment.getHosts();
- int[] diskIds = null;
- if (fragment instanceof FileFragment) {
- diskIds = ((FileFragment)fragment).getDiskIds();
- }
- for (int i = 0; i < hosts.length; i++) {
- dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
- }
- }
-
- public void addFragment(Fragment fragment, boolean useDataLocation) {
- Set<FragmentProto> fragmentProtos;
- if (fragMap.containsKey(fragment.getTableName())) {
- fragmentProtos = fragMap.get(fragment.getTableName());
- } else {
- fragmentProtos = new HashSet<FragmentProto>();
- fragMap.put(fragment.getTableName(), fragmentProtos);
- }
- fragmentProtos.add(fragment.getProto());
- if (useDataLocation) {
- addDataLocation(fragment);
- }
- totalFragmentNum++;
- }
-
- public void addFragments(Collection<Fragment> fragments) {
- for (Fragment eachFragment: fragments) {
- addFragment(eachFragment, false);
- }
- }
-
- public void setFragment(FragmentPair[] fragmentPairs) {
- for (FragmentPair eachFragmentPair : fragmentPairs) {
- this.addFragment(eachFragmentPair.getLeftFragment(), true);
- if (eachFragmentPair.getRightFragment() != null) {
- this.addFragment(eachFragmentPair.getRightFragment(), true);
- }
- }
- }
-
- public List<DataLocation> getDataLocations() {
- return dataLocations;
- }
-
- public String getSucceededHost() {
- return succeededHost;
- }
-
- public void addFetches(String tableId, Collection<FetchImpl> fetches) {
- Set<FetchImpl> fetchSet;
- if (fetchMap.containsKey(tableId)) {
- fetchSet = fetchMap.get(tableId);
- } else {
- fetchSet = Sets.newHashSet();
- }
- fetchSet.addAll(fetches);
- fetchMap.put(tableId, fetchSet);
- }
-
- public void setFetches(Map<String, Set<FetchImpl>> fetches) {
- this.fetchMap.clear();
- this.fetchMap.putAll(fetches);
- }
-
- public Collection<FragmentProto> getAllFragments() {
- Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>();
- for (Set<FragmentProto> eachFragmentSet : fragMap.values()) {
- fragmentProtos.addAll(eachFragmentSet);
- }
- return fragmentProtos;
- }
-
- public LogicalNode getLogicalPlan() {
- return this.plan;
- }
-
- public TaskId getId() {
- return taskId;
- }
-
- public Collection<FetchImpl> getFetchHosts(String tableId) {
- return fetchMap.get(tableId);
- }
-
- public Collection<Set<FetchImpl>> getFetches() {
- return fetchMap.values();
- }
-
- public Map<String, Set<FetchImpl>> getFetchMap() {
- return fetchMap;
- }
-
- public Collection<FetchImpl> getFetch(ScanNode scan) {
- return this.fetchMap.get(scan.getTableName());
- }
-
- public ScanNode[] getScanNodes() {
- return this.scan.toArray(new ScanNode[scan.size()]);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(plan.getType() + " \n");
- for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) {
- builder.append(e.getKey()).append(" : ");
- for (FragmentProto fragment : e.getValue()) {
- builder.append(fragment).append(", ");
- }
- }
- for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) {
- builder.append(e.getKey()).append(" : ");
- for (FetchImpl t : e.getValue()) {
- for (URI uri : t.getURIs()){
- builder.append(uri).append(" ");
- }
- }
- }
-
- return builder.toString();
- }
-
- public void setStats(TableStats stats) {
- this.stats = stats;
- }
-
- public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) {
- this.shuffleFileOutputs = Collections.unmodifiableList(partitions);
- }
-
- public TableStats getStats() {
- return this.stats;
- }
-
- public List<ShuffleFileOutput> getShuffleFileOutputs() {
- return this.shuffleFileOutputs;
- }
-
- public int getShuffleOutpuNum() {
- return this.shuffleFileOutputs.size();
- }
-
- public TaskAttempt newAttempt() {
- TaskAttempt attempt = new TaskAttempt(scheduleContext,
- QueryIdFactory.newTaskAttemptId(this.getId(), ++nextAttempt),
- this, eventHandler);
- lastAttemptId = attempt.getId();
- return attempt;
- }
-
- public TaskAttempt getAttempt(TaskAttemptId attemptId) {
- return attempts.get(attemptId);
- }
-
- public TaskAttempt getAttempt(int attempt) {
- return this.attempts.get(QueryIdFactory.newTaskAttemptId(this.getId(), attempt));
- }
-
- public TaskAttempt getLastAttempt() {
- return getAttempt(this.lastAttemptId);
- }
-
- public TaskAttempt getSuccessfulAttempt() {
- readLock.lock();
- try {
- if (null == successfulAttempt) {
- return null;
- }
- return attempts.get(successfulAttempt);
- } finally {
- readLock.unlock();
- }
- }
-
- public int getRetryCount () {
- return this.nextAttempt;
- }
-
- public int getTotalFragmentNum() {
- return totalFragmentNum;
- }
-
- private static class InitialScheduleTransition implements
- SingleArcTransition<Task, TaskEvent> {
-
- @Override
- public void transition(Task task, TaskEvent taskEvent) {
- task.addAndScheduleAttempt();
- }
- }
-
- public long getLaunchTime() {
- return launchTime;
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- @VisibleForTesting
- public void setLaunchTime(long launchTime) {
- this.launchTime = launchTime;
- }
-
- @VisibleForTesting
- public void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
- }
-
- public long getRunningTime() {
- if(finishTime > 0) {
- return finishTime - launchTime;
- } else {
- return System.currentTimeMillis() - launchTime;
- }
- }
-
- // This is always called in the Write Lock
- private void addAndScheduleAttempt() {
- // Create new task attempt
- TaskAttempt attempt = newAttempt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created attempt " + attempt.getId());
- }
- switch (attempts.size()) {
- case 0:
- attempts = Collections.singletonMap(attempt.getId(), attempt);
- break;
-
- case 1:
- Map<TaskAttemptId, TaskAttempt> newAttempts
- = new LinkedHashMap<TaskAttemptId, TaskAttempt>(3);
- newAttempts.putAll(attempts);
- attempts = newAttempts;
- attempts.put(attempt.getId(), attempt);
- break;
-
- default:
- attempts.put(attempt.getId(), attempt);
- break;
- }
-
- if (failedAttempts > 0) {
- eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
- TaskAttemptEventType.TA_RESCHEDULE));
- } else {
- eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(),
- TaskAttemptEventType.TA_SCHEDULE));
- }
- }
-
- private void finishTask() {
- this.finishTime = System.currentTimeMillis();
- finalTaskHistory = makeTaskHistory();
- }
-
- private static class KillNewTaskTransition implements SingleArcTransition<Task, TaskEvent> {
-
- @Override
- public void transition(Task task, TaskEvent taskEvent) {
- task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.KILLED));
- }
- }
-
- private static class KillTaskTransition implements SingleArcTransition<Task, TaskEvent> {
-
- @Override
- public void transition(Task task, TaskEvent taskEvent) {
- task.finishTask();
- task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
- }
- }
-
- private static class AttemptKilledTransition implements SingleArcTransition<Task, TaskEvent>{
-
- @Override
- public void transition(Task task, TaskEvent event) {
- task.finishTask();
- task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.KILLED));
- }
- }
-
- private static class AttemptSucceededTransition
- implements SingleArcTransition<Task, TaskEvent>{
-
- @Override
- public void transition(Task task,
- TaskEvent event) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
-
- task.successfulAttempt = attemptEvent.getTaskAttemptId();
- task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
- task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
- task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
-
- task.finishTask();
- task.eventHandler.handle(new StageTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
- }
- }
-
- private static class AttemptLaunchedTransition implements SingleArcTransition<Task, TaskEvent> {
- @Override
- public void transition(Task task,
- TaskEvent event) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
- task.launchTime = System.currentTimeMillis();
- task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
- task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort();
- }
- }
-
- private static class AttemptFailedTransition implements SingleArcTransition<Task, TaskEvent> {
- @Override
- public void transition(Task task, TaskEvent event) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- LOG.info("=============================================================");
- LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
- LOG.info("=============================================================");
- task.failedAttempts++;
- task.finishedAttempts++;
-
- task.finishTask();
- task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED));
- }
- }
-
- private static class AttemptFailedOrRetryTransition implements
- MultipleArcTransition<Task, TaskEvent, TaskState> {
-
- @Override
- public TaskState transition(Task task, TaskEvent taskEvent) {
- TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
- task.failedAttempts++;
- task.finishedAttempts++;
- boolean retry = task.failedAttempts < task.maxAttempts;
-
- LOG.info("====================================================================================");
- LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " +
- "retry:" + retry + ", attempts:" + task.failedAttempts + " <<<");
- LOG.info("====================================================================================");
-
- if (retry) {
- if (task.successfulAttempt == null) {
- task.addAndScheduleAttempt();
- }
- } else {
- task.finishTask();
- task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED));
- return TaskState.FAILED;
- }
-
- return task.getState();
- }
- }
-
- @Override
- public void handle(TaskEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getTaskId() + " of type "
- + event.getType());
- }
-
- try {
- writeLock.lock();
- TaskState oldState = getState();
- try {
- stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state"
- + ", eventType:" + event.getType().name()
- + ", oldState:" + oldState.name()
- + ", nextState:" + getState().name()
- , e);
- eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
- QueryEventType.INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getState()) {
- LOG.debug(taskId + " Task Transitioned from " + oldState + " to "
- + getState());
- }
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-
- public void setIntermediateData(Collection<IntermediateEntry> partitions) {
- this.intermediateData = new ArrayList<IntermediateEntry>(partitions);
- }
-
- public List<IntermediateEntry> getIntermediateData() {
- return this.intermediateData;
- }
-
- public static class PullHost implements Cloneable {
- String host;
- int port;
- int hashCode;
-
- public PullHost(String pullServerAddr, int pullServerPort){
- this.host = pullServerAddr;
- this.port = pullServerPort;
- this.hashCode = Objects.hashCode(host, port);
- }
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return this.port;
- }
-
- public String getPullAddress() {
- return host + ":" + port;
- }
-
- @Override
- public int hashCode() {
- return hashCode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof PullHost) {
- PullHost other = (PullHost) obj;
- return host.equals(other.host) && port == other.port;
- }
-
- return false;
- }
-
- @Override
- public PullHost clone() throws CloneNotSupportedException {
- PullHost newPullHost = (PullHost) super.clone();
- newPullHost.host = host;
- newPullHost.port = port;
- newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port);
- return newPullHost;
- }
-
- @Override
- public String toString() {
- return host + ":" + port;
- }
- }
-
- public static class IntermediateEntry {
- ExecutionBlockId ebId;
- int taskId;
- int attemptId;
- int partId;
- PullHost host;
- long volume;
- List<Pair<Long, Integer>> pages;
- List<Pair<Long, Pair<Integer, Integer>>> failureRowNums;
-
- public IntermediateEntry(IntermediateEntryProto proto) {
- this.ebId = new ExecutionBlockId(proto.getEbId());
- this.taskId = proto.getTaskId();
- this.attemptId = proto.getAttemptId();
- this.partId = proto.getPartId();
-
- String[] pullHost = proto.getHost().split(":");
- this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1]));
- this.volume = proto.getVolume();
-
- failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
- for (FailureIntermediateProto eachFailure: proto.getFailuresList()) {
-
- failureRowNums.add(new Pair(eachFailure.getPagePos(),
- new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum())));
- }
-
- pages = new ArrayList<Pair<Long, Integer>>();
- for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) {
- pages.add(new Pair(eachPage.getPos(), eachPage.getLength()));
- }
- }
-
- public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) {
- this.taskId = taskId;
- this.attemptId = attemptId;
- this.partId = partId;
- this.host = host;
- }
-
- public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) {
- this.taskId = taskId;
- this.attemptId = attemptId;
- this.partId = partId;
- this.host = host;
- this.volume = volume;
- }
-
- public ExecutionBlockId getEbId() {
- return ebId;
- }
-
- public void setEbId(ExecutionBlockId ebId) {
- this.ebId = ebId;
- }
-
- public int getTaskId() {
- return this.taskId;
- }
-
- public int getAttemptId() {
- return this.attemptId;
- }
-
- public int getPartId() {
- return this.partId;
- }
-
- public PullHost getPullHost() {
- return this.host;
- }
-
- public long getVolume() {
- return this.volume;
- }
-
- public long setVolume(long volume) {
- return this.volume = volume;
- }
-
- public List<Pair<Long, Integer>> getPages() {
- return pages;
- }
-
- public void setPages(List<Pair<Long, Integer>> pages) {
- this.pages = pages;
- }
-
- public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() {
- return failureRowNums;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(ebId, taskId, partId, attemptId, host);
- }
-
- public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) {
- List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>();
-
- if (pages == null || pages.isEmpty()) {
- return splits;
- }
- int pageSize = pages.size();
-
- long currentOffset = -1;
- long currentBytes = 0;
-
- long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume;
- for (int i = 0; i < pageSize; i++) {
- Pair<Long, Integer> eachPage = pages.get(i);
- if (currentOffset == -1) {
- currentOffset = eachPage.getFirst();
- }
- if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) {
- splits.add(new Pair(currentOffset, currentBytes));
- currentOffset = eachPage.getFirst();
- currentBytes = 0;
- realSplitVolume = splitVolume;
- }
-
- currentBytes += eachPage.getSecond();
- }
-
- //add last
- if (currentBytes > 0) {
- splits.add(new Pair(currentOffset, currentBytes));
- }
- return splits;
- }
- }
-}