You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:44 UTC
[22/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
new file mode 100644
index 0000000..63b50ac
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -0,0 +1,1125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.ColumnStats;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.*;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.fragment.FileFragment;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
+
+
+/**
+ * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine.
+ */
+public class SubQuery implements EventHandler<SubQueryEvent> {
+
+ private static final Log LOG = LogFactory.getLog(SubQuery.class);
+
+ private MasterPlan masterPlan;
+ private ExecutionBlock block;
+ private int priority;
+ private Schema schema;
+ private TableMeta meta;
+ private TableStats resultStatistics;
+ private TableStats inputStatistics;
+ private EventHandler<Event> eventHandler;
+ private final AbstractStorageManager sm;
+ private AbstractTaskScheduler taskScheduler;
+ private QueryMasterTask.QueryMasterTaskContext context;
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ private long startTime;
+ private long finishTime;
+
+ volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
+ volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+
+ private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
+ private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
+ private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
+ new AllocatedContainersCancelTransition();
+ private static final SubQueryCompleteTransition SUBQUERY_COMPLETED_TRANSITION =
+ new SubQueryCompleteTransition();
+ private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> stateMachine;
+
+ protected static final StateMachineFactory<SubQuery, SubQueryState,
+ SubQueryEventType, SubQueryEvent> stateMachineFactory =
+ new StateMachineFactory <SubQuery, SubQueryState,
+ SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
+
+ // Transitions from NEW state
+ .addTransition(SubQueryState.NEW,
+ EnumSet.of(SubQueryState.INITED, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
+ SubQueryEventType.SQ_INIT,
+ new InitAndRequestContainer())
+ .addTransition(SubQueryState.NEW, SubQueryState.NEW,
+ SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(SubQueryState.NEW, SubQueryState.KILLED,
+ SubQueryEventType.SQ_KILL)
+ .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
+ SubQueryEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from INITED state
+ .addTransition(SubQueryState.INITED, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINER_LAUNCH_TRANSITION)
+ .addTransition(SubQueryState.INITED, SubQueryState.INITED,
+ SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_KILL)
+ .addTransition(SubQueryState.INITED, SubQueryState.ERROR,
+ SubQueryEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from RUNNING state
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINER_LAUNCH_TRANSITION)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_TASK_COMPLETED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.RUNNING,
+ EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED),
+ SubQueryEventType.SQ_SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_FAILED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_KILL,
+ new KillTasksTransition())
+ .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
+ SubQueryEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able Transition
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_START)
+
+ // Transitions from KILL_WAIT state
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+ EnumSet.of(SubQueryEventType.SQ_KILL))
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_TASK_COMPLETED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT,
+ EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED, SubQueryState.KILLED),
+ SubQueryEventType.SQ_SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_FAILED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.ERROR,
+ SubQueryEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from SUCCEEDED state
+ .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+ SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR,
+ SubQueryEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able events
+ .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+ EnumSet.of(
+ SubQueryEventType.SQ_START,
+ SubQueryEventType.SQ_KILL,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED))
+
+ // Transitions from FAILED state
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(SubQueryState.FAILED, SubQueryState.ERROR,
+ SubQueryEventType.SQ_INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ EnumSet.of(
+ SubQueryEventType.SQ_START,
+ SubQueryEventType.SQ_KILL,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ SubQueryEventType.SQ_FAILED))
+
+ // Transitions from FAILED state
+ .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+ SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+ EnumSet.of(
+ SubQueryEventType.SQ_START,
+ SubQueryEventType.SQ_KILL,
+ SubQueryEventType.SQ_FAILED,
+ SubQueryEventType.SQ_INTERNAL_ERROR))
+
+ .installTopology();
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ private int totalScheduledObjectsCount;
+ private int succeededObjectCount = 0;
+ private int completedTaskCount = 0;
+ private int succeededTaskCount = 0;
+ private int killedObjectCount = 0;
+ private int failedObjectCount = 0;
+ private TaskSchedulerContext schedulerContext;
+
+ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
+ this.context = context;
+ this.masterPlan = masterPlan;
+ this.block = block;
+ this.sm = sm;
+ this.eventHandler = context.getEventHandler();
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ public static boolean isRunningState(SubQueryState state) {
+ return state == SubQueryState.INITED || state == SubQueryState.NEW || state == SubQueryState.RUNNING;
+ }
+
+ public QueryMasterTask.QueryMasterTaskContext getContext() {
+ return context;
+ }
+
+ public MasterPlan getMasterPlan() {
+ return masterPlan;
+ }
+
+ public DataChannel getDataChannel() {
+ return masterPlan.getOutgoingChannels(getId()).iterator().next();
+ }
+
+ public EventHandler<Event> getEventHandler() {
+ return eventHandler;
+ }
+
+ public AbstractTaskScheduler getTaskScheduler() {
+ return taskScheduler;
+ }
+
+ public void setStartTime() {
+ startTime = context.getClock().getTime();
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setFinishTime() {
+ finishTime = context.getClock().getTime();
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public float getTaskProgress() {
+ readLock.lock();
+ try {
+ if (getState() == SubQueryState.NEW) {
+ return 0;
+ } else {
+ return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public float getProgress() {
+ List<QueryUnit> tempTasks = null;
+ readLock.lock();
+ try {
+ if (getState() == SubQueryState.NEW) {
+ return 0;
+ } else {
+ tempTasks = new ArrayList<QueryUnit>(tasks.values());
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ float totalProgress = 0.0f;
+ for (QueryUnit eachQueryUnit: tempTasks) {
+ if (eachQueryUnit.getLastAttempt() != null) {
+ totalProgress += eachQueryUnit.getLastAttempt().getProgress();
+ }
+ }
+
+ return totalProgress/(float)tempTasks.size();
+ }
+
+ public int getSucceededObjectCount() {
+ return succeededObjectCount;
+ }
+
+ public int getTotalScheduledObjectsCount() {
+ return totalScheduledObjectsCount;
+ }
+
+ public ExecutionBlock getBlock() {
+ return block;
+ }
+
+ public void addTask(QueryUnit task) {
+ tasks.put(task.getId(), task);
+ }
+
+ /**
+ * It finalizes this subquery. It is only invoked when the subquery is succeeded.
+ */
+ public void complete() {
+ cleanup();
+ finalizeStats();
+ setFinishTime();
+ eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED));
+ }
+
+ /**
+ * It finalizes this subquery. Unlike {@link SubQuery#complete()},
+ * it is invoked when a subquery is abnormally finished.
+ *
+ * @param finalState The final subquery state
+ */
+ public void abort(SubQueryState finalState) {
+ // TODO -
+ // - committer.abortSubQuery(...)
+ // - record SubQuery Finish Time
+ // - CleanUp Tasks
+ // - Record History
+ cleanup();
+ setFinishTime();
+ eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
+ }
+
+ public StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> getStateMachine() {
+ return this.stateMachine;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+
+ public int getPriority() {
+ return this.priority;
+ }
+
+ public AbstractStorageManager getStorageManager() {
+ return sm;
+ }
+
+ public ExecutionBlockId getId() {
+ return block.getId();
+ }
+
+ public QueryUnit[] getQueryUnits() {
+ return tasks.values().toArray(new QueryUnit[tasks.size()]);
+ }
+
+ public QueryUnit getQueryUnit(QueryUnitId qid) {
+ return tasks.get(qid);
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public TableMeta getTableMeta() {
+ return meta;
+ }
+
+ public TableStats getResultStats() {
+ return resultStatistics;
+ }
+
+ public TableStats getInputStats() {
+ return inputStatistics;
+ }
+
+ public List<String> getDiagnostics() {
+ readLock.lock();
+ try {
+ return diagnostics;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected void addDiagnostic(String diag) {
+ diagnostics.add(diag);
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getId());
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof SubQuery) {
+ SubQuery other = (SubQuery)o;
+ return getId().equals(other.getId());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ public int compareTo(SubQuery other) {
+ return getId().compareTo(other.getId());
+ }
+
+ public SubQueryState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) {
+ TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
+ long[] avgRows = new long[]{0, 0};
+ long[] numBytes = new long[]{0, 0};
+ long[] readBytes = new long[]{0, 0};
+ long[] numRows = new long[]{0, 0};
+ int[] numBlocks = new int[]{0, 0};
+ int[] numOutputs = new int[]{0, 0};
+
+ List<ColumnStats> columnStatses = Lists.newArrayList();
+
+ MasterPlan masterPlan = subQuery.getMasterPlan();
+ Iterator<ExecutionBlock> it = masterPlan.getChilds(subQuery.getBlock()).iterator();
+ while (it.hasNext()) {
+ ExecutionBlock block = it.next();
+ SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId());
+ TableStats[] childStatArray = new TableStats[]{
+ childSubQuery.getInputStats(), childSubQuery.getResultStats()
+ };
+ for (int i = 0; i < 2; i++) {
+ if (childStatArray[i] == null) {
+ continue;
+ }
+ avgRows[i] += childStatArray[i].getAvgRows();
+ numBlocks[i] += childStatArray[i].getNumBlocks();
+ numBytes[i] += childStatArray[i].getNumBytes();
+ readBytes[i] += childStatArray[i].getReadBytes();
+ numOutputs[i] += childStatArray[i].getNumShuffleOutputs();
+ numRows[i] += childStatArray[i].getNumRows();
+ }
+ columnStatses.addAll(childStatArray[1].getColumnStats());
+ }
+
+ for (int i = 0; i < 2; i++) {
+ stat[i].setNumBlocks(numBlocks[i]);
+ stat[i].setNumBytes(numBytes[i]);
+ stat[i].setReadBytes(readBytes[i]);
+ stat[i].setNumShuffleOutputs(numOutputs[i]);
+ stat[i].setNumRows(numRows[i]);
+ stat[i].setAvgRows(avgRows[i]);
+ }
+ stat[1].setColumnStats(columnStatses);
+
+ return stat;
+ }
+
+ private TableStats[] computeStatFromTasks() {
+ List<TableStats> inputStatsList = Lists.newArrayList();
+ List<TableStats> resultStatsList = Lists.newArrayList();
+ for (QueryUnit unit : getQueryUnits()) {
+ resultStatsList.add(unit.getStats());
+ if (unit.getLastAttempt().getInputStats() != null) {
+ inputStatsList.add(unit.getLastAttempt().getInputStats());
+ }
+ }
+ TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList);
+ TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList);
+ return new TableStats[]{inputStats, resultStats};
+ }
+
+ private void stopScheduler() {
+ // If there are launched TaskRunners, send the 'shouldDie' message to all r
+ // via received task requests.
+ if (taskScheduler != null) {
+ taskScheduler.stop();
+ }
+ }
+
+ private void releaseContainers() {
+ // If there are still live TaskRunners, try to kill the containers.
+ eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
+ }
+
+ public void releaseContainer(ContainerId containerId) {
+ // try to kill the container.
+ ArrayList<Container> list = new ArrayList<Container>();
+ list.add(containers.get(containerId));
+ eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), list));
+ }
+
+ /**
+ * It computes all stats and sets the intermediate result.
+ */
+ private void finalizeStats() {
+ TableStats[] statsArray;
+ if (block.hasUnion()) {
+ statsArray = computeStatFromUnionBlock(this);
+ } else {
+ statsArray = computeStatFromTasks();
+ }
+
+ DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
+ // get default or store type
+ CatalogProtos.StoreType storeType = CatalogProtos.StoreType.CSV; // default setting
+
+ // if store plan (i.e., CREATE or INSERT OVERWRITE)
+ StoreTableNode storeTableNode = PlannerUtil.findTopNode(getBlock().getPlan(), NodeType.STORE);
+ if (storeTableNode != null) {
+ storeType = storeTableNode.getStorageType();
+ }
+ schema = channel.getSchema();
+ meta = CatalogUtil.newTableMeta(storeType, new Options());
+ inputStatistics = statsArray[0];
+ resultStatistics = statsArray[1];
+ }
+
+ @Override
+ public void handle(SubQueryEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType() + ", preState=" + getState());
+ }
+
+ try {
+ writeLock.lock();
+ SubQueryState oldState = getState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ eventHandler.handle(new SubQueryEvent(getId(),
+ SubQueryEventType.SQ_INTERNAL_ERROR));
+ }
+
+ // notify the eventhandler of state change
+ if (LOG.isDebugEnabled()) {
+ if (oldState != getState()) {
+ LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
+ + getState());
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void handleTaskRequestEvent(TaskRequestEvent event) {
+ taskScheduler.handleTaskRequestEvent(event);
+ }
+
+ private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
+ SubQueryEvent, SubQueryState> {
+
+ @Override
+ public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+ subQuery.setStartTime();
+ ExecutionBlock execBlock = subQuery.getBlock();
+ SubQueryState state;
+
+ try {
+ // Union operator does not require actual query processing. It is performed logically.
+ if (execBlock.hasUnion()) {
+ subQuery.finalizeStats();
+ state = SubQueryState.SUCCEEDED;
+ } else {
+ ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
+ DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
+ setShuffleIfNecessary(subQuery, channel);
+ initTaskScheduler(subQuery);
+ schedule(subQuery);
+ subQuery.totalScheduledObjectsCount = subQuery.getTaskScheduler().remainingScheduledObjectNum();
+ LOG.info(subQuery.totalScheduledObjectsCount + " objects are scheduled");
+
+ if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
+ subQuery.stopScheduler();
+ subQuery.finalizeStats();
+ subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED));
+ return SubQueryState.SUCCEEDED;
+ } else {
+ subQuery.taskScheduler.start();
+ allocateContainers(subQuery);
+ return SubQueryState.INITED;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e);
+ subQuery.setFinishTime();
+ subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage()));
+ subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR));
+ return SubQueryState.ERROR;
+ }
+
+ return state;
+ }
+
+ private void initTaskScheduler(SubQuery subQuery) throws IOException {
+ TajoConf conf = subQuery.context.getConf();
+ subQuery.schedulerContext = new TaskSchedulerContext(subQuery.context,
+ subQuery.getMasterPlan().isLeaf(subQuery.getId()), subQuery.getId());
+ subQuery.taskScheduler = TaskSchedulerFactory.get(conf, subQuery.schedulerContext, subQuery);
+ subQuery.taskScheduler.init(conf);
+ LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling for " + subQuery.getId());
+ }
+
+ /**
+ * If a parent block requires a repartition operation, the method sets proper repartition
+ * methods and the number of partitions to a given subquery.
+ */
+ private static void setShuffleIfNecessary(SubQuery subQuery, DataChannel channel) {
+ if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
+ int numTasks = calculateShuffleOutputNum(subQuery, channel);
+ Repartitioner.setShuffleOutputNumForTwoPhase(subQuery, numTasks, channel);
+ }
+ }
+
+ /**
+ * Getting the total memory of cluster
+ *
+ * @param subQuery
+ * @return mega bytes
+ */
+ private static int getClusterTotalMemory(SubQuery subQuery) {
+ List<TajoMasterProtocol.WorkerResourceProto> workers =
+ subQuery.context.getQueryMasterContext().getQueryMaster().getAllWorker();
+
+ int totalMem = 0;
+ for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ totalMem += worker.getMemoryMB();
+ }
+ return totalMem;
+ }
+ /**
+ * Getting the desire number of partitions according to the volume of input data.
+ * This method is only used to determine the partition key number of hash join or aggregation.
+ *
+ * @param subQuery
+ * @return
+ */
+ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel channel) {
+ TajoConf conf = subQuery.context.getConf();
+ MasterPlan masterPlan = subQuery.getMasterPlan();
+ ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock());
+
+ GroupbyNode grpNode = null;
+ if (parent != null) {
+ grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
+ }
+
+ // Is this subquery the first step of join?
+ if (parent != null && parent.getScanNodes().length == 2) {
+ List<ExecutionBlock> childs = masterPlan.getChilds(parent);
+
+ // for outer
+ ExecutionBlock outer = childs.get(0);
+ long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer);
+
+ // for inner
+ ExecutionBlock inner = childs.get(1);
+ long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
+ LOG.info(subQuery.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+ + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
+
+ long bigger = Math.max(outerVolume, innerVolume);
+
+ int mb = (int) Math.ceil((double) bigger / 1048576);
+ LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
+
+ int taskNum = (int) Math.ceil((double) mb /
+ conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
+
+ int totalMem = getClusterTotalMemory(subQuery);
+ LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
+ int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+
+ // determine the number of task
+ taskNum = Math.min(taskNum, slots);
+ LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum);
+
+ return taskNum;
+
+ // Is this subquery the first step of group-by?
+ } else if (grpNode != null) {
+
+ if (grpNode.getGroupingColumns().length == 0) {
+ return 1;
+ } else {
+ long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
+
+ int mb = (int) Math.ceil((double) volume / 1048576);
+ LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
+ // determine the number of task
+ int taskNumBySize = (int) Math.ceil((double) mb /
+ conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME));
+
+ int totalMem = getClusterTotalMemory(subQuery);
+
+ LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
+ int slots = Math.max(totalMem / conf.getIntVar(ConfVars.TASK_DEFAULT_MEMORY), 1);
+ int taskNum = Math.min(taskNumBySize, slots); //Maximum partitions
+ LOG.info(subQuery.getId() + ", The determined number of aggregation partitions is " + taskNum);
+ return taskNum;
+ }
+ } else {
+ LOG.info("============>>>>> Unexpected Case! <<<<<================");
+ long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block);
+
+ int mb = (int) Math.ceil((double)volume / 1048576);
+ LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
+ // determine the number of task per 128MB
+ int taskNum = (int) Math.ceil((double)mb / 128);
+ LOG.info(subQuery.getId() + ", The determined number of partitions is " + taskNum);
+ return taskNum;
+ }
+ }
+
+ private static void schedule(SubQuery subQuery) throws IOException {
+ MasterPlan masterPlan = subQuery.getMasterPlan();
+ ExecutionBlock execBlock = subQuery.getBlock();
+ if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+ scheduleFragmentsForLeafQuery(subQuery);
+ } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+ Repartitioner.scheduleFragmentsForJoinQuery(subQuery.schedulerContext, subQuery);
+ } else { // Case 3: Others (Sort or Aggregation)
+ int numTasks = getNonLeafTaskNum(subQuery);
+ Repartitioner.scheduleFragmentsForNonLeafTasks(subQuery.schedulerContext, masterPlan, subQuery, numTasks);
+ }
+ }
+
+ /**
+ * Getting the desire number of tasks according to the volume of input data
+ *
+ * @param subQuery
+ * @return
+ */
+ public static int getNonLeafTaskNum(SubQuery subQuery) {
+ // Getting intermediate data size
+ long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock());
+
+ int mb = (int) Math.ceil((double)volume / 1048576);
+ LOG.info("Table's volume is approximately " + mb + " MB");
+ // determine the number of task per 64MB
+ int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64));
+ LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
+ return maxTaskNum;
+ }
+
+ public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context,
+ ExecutionBlock execBlock) {
+ Map<String, TableDesc> tableMap = context.getTableDescMap();
+ if (masterPlan.isLeaf(execBlock)) {
+ ScanNode[] outerScans = execBlock.getScanNodes();
+ long maxVolume = 0;
+ for (ScanNode eachScanNode: outerScans) {
+ TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats();
+ if (stat.getNumBytes() > maxVolume) {
+ maxVolume = stat.getNumBytes();
+ }
+ }
+ return maxVolume;
+ } else {
+ long aggregatedVolume = 0;
+ for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
+ SubQuery subquery = context.getSubQuery(childBlock.getId());
+ if (subquery == null || subquery.getState() != SubQueryState.SUCCEEDED) {
+ aggregatedVolume += getInputVolume(masterPlan, context, childBlock);
+ } else {
+ aggregatedVolume += subquery.getResultStats().getNumBytes();
+ }
+ }
+
+ return aggregatedVolume;
+ }
+ }
+
+ public static void allocateContainers(SubQuery subQuery) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+
+ //TODO consider disk slot
+ int requiredMemoryMBPerTask = 512;
+
+ int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
+ subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+ subQuery.schedulerContext.getEstimatedTaskNum(),
+ requiredMemoryMBPerTask
+ );
+
+ final Resource resource = Records.newRecord(Resource.class);
+
+ resource.setMemory(requiredMemoryMBPerTask);
+
+ LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
+
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(subQuery.getPriority());
+ ContainerAllocationEvent event =
+ new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
+ subQuery.getId(), priority, resource, numRequest,
+ subQuery.masterPlan.isLeaf(execBlock), 0.0f);
+ subQuery.eventHandler.handle(event);
+ }
+
+ private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ ScanNode[] scans = execBlock.getScanNodes();
+ Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+ ScanNode scan = scans[0];
+ TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
+
+ Collection<FileFragment> fragments;
+ TableMeta meta = table.getMeta();
+
+ // Depending on scanner node's type, it creates fragments. If scan is for
+ // a partitioned table, It will creates lots fragments for all partitions.
+ // Otherwise, it creates at least one fragments for a table, which may
+ // span a number of blocks or possibly consists of a number of files.
+ if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+ fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table);
+ } else {
+ Path inputPath = table.getPath();
+ fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
+ }
+
+ SubQuery.scheduleFragments(subQuery, fragments);
+ if (subQuery.getTaskScheduler() instanceof DefaultTaskScheduler) {
+ //Leaf task of DefaultTaskScheduler should be fragment size
+ // EstimatedTaskNum determined number of initial container
+ subQuery.schedulerContext.setTaskSize(fragments.size());
+ subQuery.schedulerContext.setEstimatedTaskNum(fragments.size());
+ } else {
+ TajoConf conf = subQuery.context.getConf();
+ subQuery.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024);
+ int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() /
+ (double) subQuery.schedulerContext.getTaskSize());
+ subQuery.schedulerContext.setEstimatedTaskNum(estimatedTaskNum);
+ }
+ }
+ }
+
+ public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) {
+ subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+ subQuery.getId(), fragment));
+ }
+
+
+ public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> fragments) {
+ for (FileFragment eachFragment : fragments) {
+ scheduleFragment(subQuery, eachFragment);
+ }
+ }
+
+ public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments,
+ Collection<FileFragment> broadcastFragments) {
+ for (FileFragment eachLeafFragment : leftFragments) {
+ scheduleFragment(subQuery, eachLeafFragment, broadcastFragments);
+ }
+ }
+
+ public static void scheduleFragment(SubQuery subQuery,
+ FileFragment leftFragment, Collection<FileFragment> rightFragments) {
+ subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+ subQuery.getId(), leftFragment, rightFragments));
+ }
+
+ public static void scheduleFetches(SubQuery subQuery, Map<String, List<URI>> fetches) {
+ subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
+ subQuery.getId(), fetches));
+ }
+
+ public static QueryUnit newEmptyQueryUnit(TaskSchedulerContext schedulerContext,
+ QueryUnitAttemptScheduleContext queryUnitContext,
+ SubQuery subQuery, int taskId) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ QueryUnit unit = new QueryUnit(schedulerContext.getMasterContext().getConf(),
+ queryUnitContext,
+ QueryIdFactory.newQueryUnitId(schedulerContext.getBlockId(), taskId),
+ schedulerContext.isLeafQuery(), subQuery.eventHandler);
+ unit.setLogicalPlan(execBlock.getPlan());
+ subQuery.addTask(unit);
+ return unit;
+ }
+
+ private static class ContainerLaunchTransition
+ implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+ @Override
+ public void transition(SubQuery subQuery, SubQueryEvent event) {
+ try {
+ SubQueryContainerAllocationEvent allocationEvent =
+ (SubQueryContainerAllocationEvent) event;
+ for (Container container : allocationEvent.getAllocatedContainer()) {
+ ContainerId cId = container.getId();
+ if (subQuery.containers.containsKey(cId)) {
+ subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+ "Duplicated containers are allocated: " + cId.toString()));
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
+ }
+ subQuery.containers.put(cId, container);
+ }
+ LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
+ subQuery.eventHandler.handle(
+ new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
+ subQuery.getId(), allocationEvent.getAllocatedContainer()));
+
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
+ } catch (Throwable t) {
+ subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+ ExceptionUtils.getStackTrace(t)));
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
+ }
+ }
+ }
+
+ /**
+ * It is used in KILL_WAIT state against Contained Allocated event.
+ * It just returns allocated containers to resource manager.
+ */
+ private static class AllocatedContainersCancelTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+ @Override
+ public void transition(SubQuery subQuery, SubQueryEvent event) {
+ try {
+ SubQueryContainerAllocationEvent allocationEvent =
+ (SubQueryContainerAllocationEvent) event;
+ subQuery.eventHandler.handle(
+ new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
+ subQuery.getId(), allocationEvent.getAllocatedContainer()));
+ LOG.info(String.format("[%s] %d allocated containers are canceled",
+ subQuery.getId().toString(),
+ allocationEvent.getAllocatedContainer().size()));
+ } catch (Throwable t) {
+ subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+ ExceptionUtils.getStackTrace(t)));
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
+ }
+ }
+ }
+
+ private static class TaskCompletedTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+ @Override
+ public void transition(SubQuery subQuery,
+ SubQueryEvent event) {
+ SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
+ QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId());
+
+ if (task == null) { // task failed
+ LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
+ } else {
+ subQuery.completedTaskCount++;
+
+ if (taskEvent.getState() == TaskState.SUCCEEDED) {
+// if (task.isLeafTask()) {
+// subQuery.succeededObjectCount += task.getTotalFragmentNum();
+// } else {
+// subQuery.succeededObjectCount++;
+// }
+ subQuery.succeededObjectCount++;
+ } else if (task.getState() == TaskState.KILLED) {
+// if (task.isLeafTask()) {
+// subQuery.killedObjectCount += task.getTotalFragmentNum();
+// } else {
+// subQuery.killedObjectCount++;
+// }
+ subQuery.killedObjectCount++;
+ } else if (task.getState() == TaskState.FAILED) {
+// if (task.isLeafTask()) {
+// subQuery.failedObjectCount+= task.getTotalFragmentNum();
+// } else {
+// subQuery.failedObjectCount++;
+// }
+ subQuery.failedObjectCount++;
+ // if at least one task is failed, try to kill all tasks.
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
+ }
+
+ LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d",
+ subQuery.getId(),
+ subQuery.getTotalScheduledObjectsCount(),
+ subQuery.succeededObjectCount,
+ subQuery.killedObjectCount,
+ subQuery.failedObjectCount));
+
+ if (subQuery.totalScheduledObjectsCount ==
+ subQuery.succeededObjectCount + subQuery.killedObjectCount + subQuery.failedObjectCount) {
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_SUBQUERY_COMPLETED));
+ }
+ }
+ }
+ }
+
+ private static class KillTasksTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+ @Override
+ public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+ subQuery.getTaskScheduler().stop();
+ for (QueryUnit queryUnit : subQuery.getQueryUnits()) {
+ subQuery.eventHandler.handle(new TaskEvent(queryUnit.getId(), TaskEventType.T_KILL));
+ }
+ }
+ }
+
+ private void cleanup() {
+ stopScheduler();
+ releaseContainers();
+ }
+
+ private static class SubQueryCompleteTransition
+ implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+
+ @Override
+ public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+ // TODO - Commit subQuery & do cleanup
+ // TODO - records succeeded, failed, killed completed task
+ // TODO - records metrics
+ try {
+ LOG.info(String.format("subQuery completed - %s (total=%d, success=%d, killed=%d)",
+ subQuery.getId().toString(),
+ subQuery.getTotalScheduledObjectsCount(),
+ subQuery.getSucceededObjectCount(),
+ subQuery.killedObjectCount));
+
+ if (subQuery.killedObjectCount > 0 || subQuery.failedObjectCount > 0) {
+ if (subQuery.failedObjectCount > 0) {
+ subQuery.abort(SubQueryState.FAILED);
+ return SubQueryState.FAILED;
+ } else if (subQuery.killedObjectCount > 0) {
+ subQuery.abort(SubQueryState.KILLED);
+ return SubQueryState.KILLED;
+ } else {
+ LOG.error("Invalid State " + subQuery.getState() + " State");
+ subQuery.abort(SubQueryState.ERROR);
+ return SubQueryState.ERROR;
+ }
+ } else {
+ subQuery.complete();
+ return SubQueryState.SUCCEEDED;
+ }
+ } catch (Throwable t) {
+ LOG.error(t);
+ subQuery.abort(SubQueryState.ERROR);
+ return SubQueryState.ERROR;
+ }
+ }
+ }
+
+ private static class DiagnosticsUpdateTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+ @Override
+ public void transition(SubQuery subQuery, SubQueryEvent event) {
+ subQuery.addDiagnostic(((SubQueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+ }
+ }
+
+ private static class InternalErrorTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
+ @Override
+ public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+ subQuery.abort(SubQueryState.ERROR);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
new file mode 100644
index 0000000..effcfde
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+public enum SubQueryState {
+ NEW,
+ INITED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILL_WAIT,
+ KILLED,
+ ERROR
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
new file mode 100644
index 0000000..a995058
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.tajo.QueryId;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+
+/**
+ * It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager.
+ */
+public class TajoRMContext {
+
+ final Dispatcher rmDispatcher;
+
+ /** map between workerIds and running workers */
+ private final ConcurrentMap<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
+
+ /** map between workerIds and inactive workers */
+ private final ConcurrentMap<String, Worker> inactiveWorkers = new ConcurrentHashMap<String, Worker>();
+
+ /** map between queryIds and query master ContainerId */
+ private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap();
+
+ private final Set<String> liveQueryMasterWorkerResources =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+ public TajoRMContext(Dispatcher dispatcher) {
+ this.rmDispatcher = dispatcher;
+ }
+
+ public Dispatcher getDispatcher() {
+ return rmDispatcher;
+ }
+
+ /**
+ * @return The Map for active workers
+ */
+ public ConcurrentMap<String, Worker> getWorkers() {
+ return workers;
+ }
+
+ /**
+ * @return The Map for inactive workers
+ */
+ public ConcurrentMap<String, Worker> getInactiveWorkers() {
+ return inactiveWorkers;
+ }
+
+ /**
+ *
+ * @return The Map for query master containers
+ */
+ public ConcurrentMap<QueryId, ContainerIdProto> getQueryMasterContainer() {
+ return qmContainerMap;
+ }
+
+ public Set<String> getQueryMasterWorker() {
+ return liveQueryMasterWorkerResources;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
new file mode 100644
index 0000000..4bd7adb
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoUtil;
+
+import java.io.IOError;
+import java.net.InetSocketAddress;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
+
+/**
+ * It receives pings that workers periodically send. The ping messages contains the worker resources and their statuses.
+ * From ping messages, {@link TajoResourceTracker} tracks the recent status of all workers.
+ *
+ * In detail, it has two main roles as follows:
+ *
+ * <ul>
+ * <li>Membership management for nodes which join to a Tajo cluster</li>
+ * <ul>
+ * <li>Register - It receives the ping from a new worker. It registers the worker.</li>
+ * <li>Unregister - It unregisters a worker who does not send ping for some expiry time.</li>
+ * <ul>
+ * <li>Status Update - It updates the status of all participating workers</li>
+ * </ul>
+ */
+public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface {
+ /** Class logger */
+ private Log LOG = LogFactory.getLog(TajoResourceTracker.class);
+ /** the context of TajoWorkerResourceManager */
+ private final TajoRMContext rmContext;
+ /** Liveliness monitor which checks ping expiry times of workers */
+ private final WorkerLivelinessMonitor workerLivelinessMonitor;
+
+ /** RPC server for worker resource tracker */
+ private AsyncRpcServer server;
+ /** The bind address of RPC server of worker resource tracker */
+ private InetSocketAddress bindAddress;
+
+ public TajoResourceTracker(TajoRMContext rmContext, WorkerLivelinessMonitor workerLivelinessMonitor) {
+ super(TajoResourceTracker.class.getSimpleName());
+ this.rmContext = rmContext;
+ this.workerLivelinessMonitor = workerLivelinessMonitor;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance");
+ TajoConf systemConf = (TajoConf) conf;
+
+ String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+
+ try {
+ server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, 3);
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new IOError(e);
+ }
+
+ server.start();
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ // Set actual bind address to the systemConf
+ systemConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
+
+ LOG.info("TajoResourceTracker starts up (" + this.bindAddress + ")");
+ super.start();
+ }
+
+ @Override
+ public void serviceStop() {
+ // server can be null if some exception occurs before the rpc server starts up.
+ if(server != null) {
+ server.shutdown();
+ server = null;
+ }
+ super.stop();
+ }
+
+ /** The response builder */
+ private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
+
+ private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
+ return new WorkerStatusEvent(
+ workerKey,
+ heartbeat.getServerStatus().getRunningTaskNum(),
+ heartbeat.getServerStatus().getJvmHeap().getMaxHeap(),
+ heartbeat.getServerStatus().getJvmHeap().getFreeHeap(),
+ heartbeat.getServerStatus().getJvmHeap().getTotalHeap());
+ }
+
+ @Override
+ public void heartbeat(
+ RpcController controller,
+ NodeHeartbeat heartbeat,
+ RpcCallback<TajoHeartbeatResponse> done) {
+
+ try {
+ // get a workerId from the heartbeat
+ String workerId = createWorkerId(heartbeat);
+
+ if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running
+
+ // status update
+ rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(workerId, heartbeat));
+ // refresh ping
+ workerLivelinessMonitor.receivedPing(workerId);
+
+ } else if (rmContext.getInactiveWorkers().containsKey(workerId)) { // worker was inactive
+
+ // remove the inactive worker from the list of inactive workers.
+ Worker worker = rmContext.getInactiveWorkers().remove(workerId);
+ workerLivelinessMonitor.unregister(worker.getWorkerId());
+
+ // create new worker instance
+ Worker newWorker = createWorkerResource(heartbeat);
+ String newWorkerId = newWorker.getWorkerId();
+ // add the new worker to the list of active workers
+ rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker);
+
+ // Transit the worker to RUNNING
+ rmContext.getDispatcher().getEventHandler().handle(new WorkerEvent(newWorkerId, WorkerEventType.STARTED));
+ // register the worker to the liveliness monitor
+ workerLivelinessMonitor.register(newWorkerId);
+
+ } else { // if new worker pings firstly
+
+ // create new worker instance
+ Worker newWorker = createWorkerResource(heartbeat);
+ Worker oldWorker = rmContext.getWorkers().putIfAbsent(workerId, newWorker);
+
+ if (oldWorker == null) {
+ // Transit the worker to RUNNING
+ rmContext.rmDispatcher.getEventHandler().handle(new WorkerEvent(workerId, WorkerEventType.STARTED));
+ } else {
+ LOG.info("Reconnect from the node at: " + workerId);
+ workerLivelinessMonitor.unregister(workerId);
+ rmContext.getDispatcher().getEventHandler().handle(new WorkerReconnectEvent(workerId, newWorker));
+ }
+
+ workerLivelinessMonitor.register(workerId);
+ }
+
+ } finally {
+ builder.setClusterResourceSummary(getClusterResourceSummary());
+ done.run(builder.build());
+ }
+ }
+
+ private static final String createWorkerId(NodeHeartbeat heartbeat) {
+ return heartbeat.getTajoWorkerHost() + ":" + heartbeat.getTajoQueryMasterPort() + ":" + heartbeat.getPeerRpcPort();
+ }
+
+ private Worker createWorkerResource(NodeHeartbeat request) {
+ boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
+ boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
+
+ WorkerResource workerResource = new WorkerResource();
+ workerResource.setQueryMasterMode(queryMasterMode);
+ workerResource.setTaskRunnerMode(taskRunnerMode);
+
+ if(request.getServerStatus() != null) {
+ workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());
+ workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
+ workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
+ workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
+ workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap());
+ workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
+ workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
+ } else {
+ workerResource.setMemoryMB(4096);
+ workerResource.setDiskSlots(4);
+ workerResource.setCpuCoreSlots(4);
+ }
+
+ Worker worker = new Worker(rmContext, workerResource);
+ worker.setHostName(request.getTajoWorkerHost());
+ worker.setHttpPort(request.getTajoWorkerHttpPort());
+ worker.setPeerRpcPort(request.getPeerRpcPort());
+ worker.setQueryMasterPort(request.getTajoQueryMasterPort());
+ worker.setClientPort(request.getTajoWorkerClientPort());
+ worker.setPullServerPort(request.getTajoWorkerPullServerPort());
+ return worker;
+ }
+
+ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ int totalDiskSlots = 0;
+ int totalCpuCoreSlots = 0;
+ int totalMemoryMB = 0;
+
+ int totalAvailableDiskSlots = 0;
+ int totalAvailableCpuCoreSlots = 0;
+ int totalAvailableMemoryMB = 0;
+
+ synchronized(rmContext) {
+ for(String eachWorker: rmContext.getWorkers().keySet()) {
+ Worker worker = rmContext.getWorkers().get(eachWorker);
+ WorkerResource resource = worker.getResource();
+ if(worker != null) {
+ totalMemoryMB += resource.getMemoryMB();
+ totalAvailableMemoryMB += resource.getAvailableMemoryMB();
+
+ totalDiskSlots += resource.getDiskSlots();
+ totalAvailableDiskSlots += resource.getAvailableDiskSlots();
+
+ totalCpuCoreSlots += resource.getCpuCoreSlots();
+ totalAvailableCpuCoreSlots += resource.getAvailableCpuCoreSlots();
+ }
+ }
+ }
+
+ return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+ .setNumWorkers(rmContext.getWorkers().size())
+ .setTotalCpuCoreSlots(totalCpuCoreSlots)
+ .setTotalDiskSlots(totalDiskSlots)
+ .setTotalMemoryMB(totalMemoryMB)
+ .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
+ .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
+ .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
new file mode 100644
index 0000000..4d6cbd2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.api.records.*;
+
+public class TajoWorkerContainer extends Container {
+ ContainerId id;
+ NodeId nodeId;
+ Worker worker;
+
+ public Worker getWorkerResource() {
+ return worker;
+ }
+
+ public void setWorkerResource(Worker workerResource) {
+ this.worker = workerResource;
+ }
+
+ @Override
+ public ContainerId getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(ContainerId id) {
+ this.id = id;
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public void setNodeId(NodeId nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public String getNodeHttpAddress() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setNodeHttpAddress(String nodeHttpAddress) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Resource getResource() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setResource(Resource resource) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Priority getPriority() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setPriority(Priority priority) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public Token getContainerToken() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void setContainerToken(Token containerToken) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public int compareTo(Container container) {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
new file mode 100644
index 0000000..634ad2b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+public class TajoWorkerContainerId extends ContainerId {
+ ApplicationAttemptId applicationAttemptId;
+ int id;
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ @Override
+ public void setApplicationAttemptId(ApplicationAttemptId atId) {
+ this.applicationAttemptId = atId;
+ }
+
+ @Override
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public YarnProtos.ContainerIdProto getProto() {
+ YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+ .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
+ .setId(applicationAttemptId.getApplicationId().getId())
+ .build();
+
+ YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+ .setAttemptId(applicationAttemptId.getAttemptId())
+ .setApplicationId(appIdProto)
+ .build();
+
+ return YarnProtos.ContainerIdProto.newBuilder()
+ .setAppAttemptId(attemptIdProto)
+ .setAppId(appIdProto)
+ .setId(id)
+ .build();
+ }
+
+ public static YarnProtos.ContainerIdProto getContainerIdProto(ContainerId containerId) {
+ if(containerId instanceof TajoWorkerContainerId) {
+ return ((TajoWorkerContainerId)containerId).getProto();
+ } else {
+ YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+ .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
+ .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
+ .build();
+
+ YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+ .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
+ .setApplicationId(appIdProto)
+ .build();
+
+ return YarnProtos.ContainerIdProto.newBuilder()
+ .setAppAttemptId(attemptIdProto)
+ .setAppId(appIdProto)
+ .setId(containerId.getId())
+ .build();
+ }
+ }
+
+ @Override
+ protected void build() {
+
+ }
+}