You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:03 UTC
[09/51] [partial] TAJO-22: The package prefix should be
org.apache.tajo. (DaeMyung Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
deleted file mode 100644
index d353733..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ /dev/null
@@ -1,766 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.Records;
-import tajo.QueryIdFactory;
-import tajo.QueryUnitId;
-import tajo.SubQueryId;
-import tajo.catalog.CatalogService;
-import tajo.catalog.CatalogUtil;
-import tajo.catalog.TableDesc;
-import tajo.catalog.TableMeta;
-import tajo.catalog.statistics.ColumnStat;
-import tajo.catalog.statistics.StatisticsUtil;
-import tajo.catalog.statistics.TableStat;
-import tajo.conf.TajoConf;
-import tajo.engine.planner.PlannerUtil;
-import tajo.engine.planner.logical.ExprType;
-import tajo.engine.planner.logical.GroupbyNode;
-import tajo.engine.planner.logical.ScanNode;
-import tajo.engine.planner.logical.StoreTableNode;
-import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerGroupEvent.EventType;
-import tajo.master.event.*;
-import tajo.storage.Fragment;
-import tajo.storage.StorageManager;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static tajo.conf.TajoConf.ConfVars;
-
-
-/**
- * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine.
- */
-public class SubQuery implements EventHandler<SubQueryEvent> {
-
- private static final Log LOG = LogFactory.getLog(SubQuery.class);
-
- private ExecutionBlock block;
- private int priority;
- private TableMeta meta;
- private EventHandler eventHandler;
- private final StorageManager sm;
- private TaskSchedulerImpl taskScheduler;
- private QueryContext context;
-
- private long startTime;
- private long finishTime;
-
- volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
- volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
-
-
- private static ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
- private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent>
- stateMachine;
-
- private StateMachineFactory<SubQuery, SubQueryState,
- SubQueryEventType, SubQueryEvent> stateMachineFactory =
- new StateMachineFactory <SubQuery, SubQueryState,
- SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
-
- .addTransition(SubQueryState.NEW,
- EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
- SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
-
- .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
-
- .addTransition(SubQueryState.CONTAINER_ALLOCATED,
- EnumSet.of(SubQueryState.RUNNING, SubQueryState.FAILED,
- SubQueryState.SUCCEEDED), SubQueryEventType.SQ_START, new StartTransition())
- .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
-
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, SubQueryEventType.SQ_START)
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_TASK_COMPLETED, new TaskCompletedTransition())
- .addTransition(SubQueryState.RUNNING, SubQueryState.SUCCEEDED,
- SubQueryEventType.SQ_SUBQUERY_COMPLETED, new SubQueryCompleteTransition())
- .addTransition(SubQueryState.RUNNING, SubQueryState.FAILED,
- SubQueryEventType.SQ_FAILED, new InternalErrorTransition())
-
- .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
- SubQueryEventType.SQ_START)
- .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED)
-
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_START)
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED)
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_FAILED)
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_INTERNAL_ERROR);
-
-
- private final Lock readLock;
- private final Lock writeLock;
-
- private int completedTaskCount = 0;
-
- public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) {
- this.context = context;
- this.block = block;
- this.sm = sm;
- this.eventHandler = context.getEventHandler();
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
- stateMachine = stateMachineFactory.make(this);
- }
-
- public QueryContext getContext() {
- return context;
- }
-
- public EventHandler getEventHandler() {
- return eventHandler;
- }
-
- public TaskScheduler getTaskScheduler() {
- return taskScheduler;
- }
-
- public void setStartTime() {
- startTime = context.getClock().getTime();
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public long getStartTime() {
- return this.startTime;
- }
-
- public void setFinishTime() {
- finishTime = context.getClock().getTime();
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public long getFinishTime() {
- return this.finishTime;
- }
-
- public float getProgress() {
- readLock.lock();
- try {
- if (getState() == SubQueryState.NEW) {
- return 0;
- } else {
- if (completedTaskCount == 0) {
- return 0.0f;
- } else {
- return (float)completedTaskCount / (float)tasks.size();
- }
- }
- } finally {
- readLock.unlock();
- }
- }
-
- public ExecutionBlock getBlock() {
- return block;
- }
-
- public void addTask(QueryUnit task) {
- tasks.put(task.getId(), task);
- }
-
- public void abortSubQuery(SubQueryState finalState) {
- // TODO -
- // - committer.abortSubQuery(...)
- // - record SubQuery Finish Time
- // - CleanUp Tasks
- // - Record History
-
- eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
- }
-
- public StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> getStateMachine() {
- return this.stateMachine;
- }
-
- public void setPriority(int priority) {
- this.priority = priority;
- }
-
-
- public int getPriority() {
- return this.priority;
- }
-
- public StorageManager getStorageManager() {
- return sm;
- }
-
- public SubQuery getChildQuery(ScanNode scanForChild) {
- return context.getSubQuery(block.getChildBlock(scanForChild).getId());
- }
-
- public SubQueryId getId() {
- return block.getId();
- }
-
- public QueryUnit[] getQueryUnits() {
- return tasks.values().toArray(new QueryUnit[tasks.size()]);
- }
-
- public QueryUnit getQueryUnit(QueryUnitId qid) {
- return tasks.get(qid);
- }
-
- public void setTableMeta(TableMeta meta) {
- this.meta = meta;
- }
-
- @SuppressWarnings("UnusedDeclaration")
- public TableMeta getTableMeta() {
- return meta;
- }
-
- public TableStat getTableStat() {
- return this.meta.getStat();
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(this.getId());
- return sb.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof SubQuery) {
- SubQuery other = (SubQuery)o;
- return getId().equals(other.getId());
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return getId().hashCode();
- }
-
- public int compareTo(SubQuery other) {
- return getId().compareTo(other.getId());
- }
-
- public SubQueryState getState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- private static TableStat computeStatFromUnionBlock(SubQuery unit) {
- TableStat stat = new TableStat();
- TableStat childStat;
- long avgRows = 0, numBytes = 0, numRows = 0;
- int numBlocks = 0, numPartitions = 0;
- List<ColumnStat> columnStats = Lists.newArrayList();
-
- Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
- while (it.hasNext()) {
- ExecutionBlock block = it.next();
- SubQuery childSubQuery = unit.context.getSubQuery(block.getId());
- childStat = childSubQuery.getTableStat();
- avgRows += childStat.getAvgRows();
- columnStats.addAll(childStat.getColumnStats());
- numBlocks += childStat.getNumBlocks();
- numBytes += childStat.getNumBytes();
- numPartitions += childStat.getNumPartitions();
- numRows += childStat.getNumRows();
- }
-
- stat.setColumnStats(columnStats);
- stat.setNumBlocks(numBlocks);
- stat.setNumBytes(numBytes);
- stat.setNumPartitions(numPartitions);
- stat.setNumRows(numRows);
- stat.setAvgRows(avgRows);
- return stat;
- }
-
- public TableMeta buildTableMeta() throws IOException {
- finishTime = context.getClock().getTime();
-
- TableStat stat;
- if (block.hasUnion()) {
- stat = computeStatFromUnionBlock(this);
- } else {
- stat = computeStatFromTasks();
- }
- TableMeta meta = writeStat(this, stat);
- meta.setStat(stat);
- setTableMeta(meta);
- return meta;
- }
-
- private TableStat computeStatFromTasks() {
- List<TableStat> stats = Lists.newArrayList();
- for (QueryUnit unit : getQueryUnits()) {
- stats.add(unit.getStats());
- }
- TableStat tableStat = StatisticsUtil.aggregateTableStat(stats);
- return tableStat;
- }
-
- private TableMeta writeStat(SubQuery subQuery, TableStat stat)
- throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
- StoreTableNode storeTableNode = execBlock.getStoreTableNode();
- TableMeta meta = toTableMeta(storeTableNode);
- meta.setStat(stat);
- sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
- return meta;
- }
-
- private static TableMeta toTableMeta(StoreTableNode store) {
- if (store.hasOptions()) {
- return CatalogUtil.newTableMeta(store.getOutSchema(),
- store.getStorageType(), store.getOptions());
- } else {
- return CatalogUtil.newTableMeta(store.getOutSchema(),
- store.getStorageType());
- }
- }
-
- private void stopScheduler() {
- // If there are launched TaskRunners, send the 'shouldDie' message to all r
- // via received task requests.
- if (taskScheduler != null) {
- taskScheduler.stop();
- }
- }
-
- private void releaseContainers() {
- // If there are still live TaskRunners, try to kill the containers.
- eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(),
- containers.values()));
- }
-
- private void finish() {
- TableMeta meta = null;
- try {
- meta = buildTableMeta();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- setTableMeta(meta);
- setFinishTime();
- eventHandler.handle(new SubQuerySucceeEvent(getId(), meta));
- }
-
- @Override
- public void handle(SubQueryEvent event) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType());
- }
-
- try {
- writeLock.lock();
- SubQueryState oldState = getState();
- try {
- getStateMachine().doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state", e);
- eventHandler.handle(new SubQueryEvent(getId(),
- SubQueryEventType.SQ_INTERNAL_ERROR));
- }
-
- // notify the eventhandler of state change
- if (LOG.isDebugEnabled()) {
- if (oldState != getState()) {
- LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
- + getState());
- }
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-
- private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
- SubQueryEvent, SubQueryState> {
-
- @Override
- public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- subQuery.setStartTime();
- ExecutionBlock execBlock = subQuery.getBlock();
- SubQueryState state;
-
- try {
- // Union operator does not require actual query processing. It is performed logically.
- if (execBlock.hasUnion()) {
- subQuery.finish();
- state = SubQueryState.SUCCEEDED;
- } else {
- setRepartitionIfNecessary(subQuery);
- createTasks(subQuery);
-
- if (subQuery.tasks.size() == 0) { // if there is no tasks
- subQuery.finish();
- return SubQueryState.SUCCEEDED;
- } else {
- initTaskScheduler(subQuery);
- allocateContainers(subQuery);
- return SubQueryState.INIT;
- }
- }
- } catch (Exception e) {
- LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
- subQuery.eventHandler.handle(
- new QueryDiagnosticsUpdateEvent(subQuery.getId().getQueryId(), e.getMessage()));
- subQuery.eventHandler.handle(
- new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.FAILED));
- return SubQueryState.FAILED;
- }
-
- return state;
- }
-
- private void initTaskScheduler(SubQuery subQuery) {
- subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.context);
- subQuery.taskScheduler.init(subQuery.context.getConf());
- subQuery.taskScheduler.start();
- }
-
- /**
- * If a parent block requires a repartition operation, the method sets proper repartition
- * methods and the number of partitions to a given subquery.
- */
- private static void setRepartitionIfNecessary(SubQuery subQuery) {
- if (subQuery.getBlock().hasParentBlock()) {
- int numTasks = calculatePartitionNum(subQuery);
- Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
- }
- }
-
- /**
- * Getting the desire number of partitions according to the volume of input data.
- * This method is only used to determine the partition key number of hash join or aggregation.
- *
- * @param subQuery
- * @return
- */
- public static int calculatePartitionNum(SubQuery subQuery) {
- TajoConf conf = subQuery.context.getConf();
- ExecutionBlock parent = subQuery.getBlock().getParentBlock();
-
- GroupbyNode grpNode = null;
- if (parent != null) {
- grpNode = (GroupbyNode) PlannerUtil.findTopNode(
- parent.getPlan(), ExprType.GROUP_BY);
- }
-
- // Is this subquery the first step of join?
- if (parent != null && parent.getScanNodes().length == 2) {
- Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
-
- // for inner
- ExecutionBlock outer = child.next();
- long outerVolume = getInputVolume(subQuery.context, outer);
-
- // for inner
- ExecutionBlock inner = child.next();
- long innerVolume = getInputVolume(subQuery.context, inner);
- LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
- LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
-
- long smaller = Math.min(outerVolume, innerVolume);
-
- int mb = (int) Math.ceil((double)smaller / 1048576);
- LOG.info("Smaller Table's volume is approximately " + mb + " MB");
- // determine the number of task
- int taskNum = (int) Math.ceil((double)mb /
- conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME));
- LOG.info("The determined number of join partitions is " + taskNum);
- return taskNum;
-
- // Is this subquery the first step of group-by?
- } else if (grpNode != null) {
-
- if (grpNode.getGroupingColumns().length == 0) {
- return 1;
- } else {
- long volume = getInputVolume(subQuery.context, subQuery.block);
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info("Table's volume is approximately " + mb + " MB");
- // determine the number of task
- int taskNum = (int) Math.ceil((double)mb /
- conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME));
- LOG.info("The determined number of aggregation partitions is " + taskNum);
- return taskNum;
- }
- } else {
- LOG.info("============>>>>> Unexpected Case! <<<<<================");
- long volume = getInputVolume(subQuery.context, subQuery.block);
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info("Table's volume is approximately " + mb + " MB");
- // determine the number of task per 128MB
- int taskNum = (int) Math.ceil((double)mb / 128);
- LOG.info("The determined number of partitions is " + taskNum);
- return taskNum;
- }
- }
-
- private static void createTasks(SubQuery subQuery) throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
- QueryUnit [] tasks;
- if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
- tasks = createLeafTasks(subQuery);
-
- } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
- tasks = Repartitioner.createJoinTasks(subQuery);
-
- } else { // Case 3: Others (Sort or Aggregation)
- int numTasks = getNonLeafTaskNum(subQuery);
- SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId();
- SubQuery child = subQuery.context.getSubQuery(childId);
- tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
- }
-
- LOG.info("Create " + tasks.length + " Tasks");
-
- for (QueryUnit task : tasks) {
- subQuery.addTask(task);
- }
- }
-
- /**
- * Getting the desire number of tasks according to the volume of input data
- *
- * @param subQuery
- * @return
- */
- public static int getNonLeafTaskNum(SubQuery subQuery) {
- // Getting intermediate data size
- long volume = getInputVolume(subQuery.context, subQuery.getBlock());
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info("Table's volume is approximately " + mb + " MB");
- // determine the number of task per 64MB
- int maxTaskNum = (int) Math.ceil((double)mb / 64);
- LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
- return maxTaskNum;
- }
-
- public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) {
- CatalogService catalog = context.getCatalog();
- if (execBlock.isLeafBlock()) {
- ScanNode outerScan = execBlock.getScanNodes()[0];
- TableStat stat = catalog.getTableDesc(outerScan.getTableId()).getMeta().getStat();
- return stat.getNumBytes();
- } else {
- long aggregatedVolume = 0;
- for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
- SubQuery subquery = context.getSubQuery(childBlock.getId());
- aggregatedVolume += subquery.getTableStat().getNumBytes();
- }
-
- return aggregatedVolume;
- }
- }
-
- public static void allocateContainers(SubQuery subQuery) {
- ExecutionBlock execBlock = subQuery.getBlock();
- QueryUnit [] tasks = subQuery.getQueryUnits();
-
- int numClusterNodes = subQuery.getContext().getNumClusterNode();
- int numRequest = Math.min(tasks.length, numClusterNodes * 4);
-
- final Resource resource = Records.newRecord(Resource.class);
- if (tasks.length <= numClusterNodes) {
- resource.setMemory(subQuery.context.getMaxContainerCapability());
- } else {
- resource.setMemory(2000);
- }
-
- Priority priority = Records.newRecord(Priority.class);
- priority.setPriority(subQuery.getPriority());
- ContainerAllocationEvent event =
- new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
- subQuery.getId(), priority, resource, numRequest,
- execBlock.isLeafBlock(), 0.0f);
- subQuery.eventHandler.handle(event);
- }
-
- private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
- ScanNode[] scans = execBlock.getScanNodes();
- Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
- TableMeta meta;
- Path inputPath;
-
- ScanNode scan = scans[0];
- TableDesc desc = subQuery.context.getCatalog().getTableDesc(scan.getTableId());
- inputPath = desc.getPath();
- meta = desc.getMeta();
-
- // TODO - should be change the inner directory
- Path oldPath = new Path(inputPath, "data");
- FileSystem fs = inputPath.getFileSystem(subQuery.context.getConf());
- if (fs.exists(oldPath)) {
- inputPath = oldPath;
- }
- List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
-
- QueryUnit queryUnit;
- List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
-
- int i = 0;
- for (Fragment fragment : fragments) {
- queryUnit = newQueryUnit(subQuery, i++, fragment);
- queryUnits.add(queryUnit);
- }
-
- return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
- }
-
- private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
- ExecutionBlock execBlock = subQuery.getBlock();
- QueryUnit unit = new QueryUnit(
- QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
- subQuery.eventHandler);
- unit.setLogicalPlan(execBlock.getPlan());
- unit.setFragment2(fragment);
- return unit;
- }
- }
-
- int i = 0;
- private static class ContainerLaunchTransition
- implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent event) {
- SubQueryContainerAllocationEvent allocationEvent =
- (SubQueryContainerAllocationEvent) event;
- for (Container container : allocationEvent.getAllocatedContainer()) {
- ContainerId cId = container.getId();
- if (subQuery.containers.containsKey(cId)) {
- LOG.info(">>>>>>>>>>>> Duplicate Container! <<<<<<<<<<<");
- }
- subQuery.containers.put(cId, container);
- // TODO - This is debugging message. Should be removed
- subQuery.i++;
- }
- LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
- subQuery.eventHandler.handle(
- new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
- subQuery.getId(), allocationEvent.getAllocatedContainer()));
-
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
- SubQueryEventType.SQ_START));
- }
- }
-
- private static class StartTransition implements
- MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
-
- @Override
- public SubQueryState transition(SubQuery subQuery,
- SubQueryEvent subQueryEvent) {
- // schedule tasks
- try {
- for (QueryUnitId taskId : subQuery.tasks.keySet()) {
- subQuery.eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
- }
-
- return SubQueryState.RUNNING;
- } catch (Exception e) {
- LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
- return SubQueryState.FAILED;
- }
- }
- }
-
- private static class TaskCompletedTransition
- implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
- @Override
- public void transition(SubQuery subQuery,
- SubQueryEvent event) {
- subQuery.completedTaskCount++;
- SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
- QueryUnitAttempt task = subQuery.getQueryUnit(taskEvent.getTaskId()).getSuccessfulAttempt();
-
- LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/"
- + subQuery.tasks.size() + " on " + task.getHost());
- if (subQuery.completedTaskCount == subQuery.tasks.size()) {
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
- SubQueryEventType.SQ_SUBQUERY_COMPLETED));
- }
- }
- }
-
- private static class SubQueryCompleteTransition
- implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- // TODO - Commit subQuery & do cleanup
- // TODO - records succeeded, failed, killed completed task
- // TODO - records metrics
- subQuery.stopScheduler();
- subQuery.releaseContainers();
- subQuery.finish();
- }
- }
-
- private static class InternalErrorTransition
- implements SingleArcTransition<SubQuery, SubQueryEvent> {
-
- @Override
- public void transition(SubQuery subQuery,
- SubQueryEvent subQueryEvent) {
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java
deleted file mode 100644
index 673f1df..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQueryState.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-public enum SubQueryState {
- NEW,
- CONTAINER_ALLOCATED,
- INIT,
- RUNNING,
- SUCCEEDED,
- FAILED
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
deleted file mode 100644
index 82b1445..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TajoMaster.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.common.collect.Maps;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.RackResolver;
-import tajo.QueryId;
-import tajo.QueryIdFactory;
-import tajo.TajoConstants;
-import tajo.catalog.*;
-import tajo.catalog.proto.CatalogProtos.FunctionType;
-import tajo.common.TajoDataTypes.Type;
-import tajo.conf.TajoConf;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import tajo.engine.function.Country;
-import tajo.engine.function.InCountry;
-import tajo.engine.function.builtin.*;
-import tajo.master.event.QueryEvent;
-import tajo.master.event.QueryEventType;
-import tajo.storage.StorageManager;
-import tajo.webapp.StaticHttpServer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class TajoMaster extends CompositeService {
-
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(TajoMaster.class);
-
- public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
- private MasterContext context;
- private TajoConf conf;
- private FileSystem defaultFS;
- private Clock clock;
-
- private Path basePath;
- private Path wareHousePath;
-
- private CatalogServer catalogServer;
- private CatalogService catalog;
- private StorageManager storeManager;
- private GlobalEngine globalEngine;
- private AsyncDispatcher dispatcher;
- private ClientService clientService;
- private YarnRPC yarnRPC;
-
- //Web Server
- private StaticHttpServer webServer;
-
- public TajoMaster() throws Exception {
- super(TajoMaster.class.getName());
- }
-
- @Override
- public void init(Configuration _conf) {
- this.conf = (TajoConf) _conf;
-
- context = new MasterContext(conf);
- clock = new SystemClock();
-
-
- try {
- webServer = StaticHttpServer.getInstance(this ,"admin", null, 8080 ,
- true, null, context.getConf(), null);
- webServer.start();
-
- QueryIdFactory.reset();
-
- // Get the tajo base dir
- this.basePath = new Path(conf.getVar(ConfVars.ROOT_DIR));
- LOG.info("Tajo Root dir is set " + basePath);
- // Get default DFS uri from the base dir
- this.defaultFS = basePath.getFileSystem(conf);
- conf.set("fs.defaultFS", defaultFS.getUri().toString());
- LOG.info("FileSystem (" + this.defaultFS.getUri() + ") is initialized.");
-
- if (!defaultFS.exists(basePath)) {
- defaultFS.mkdirs(basePath);
- LOG.info("Tajo Base dir (" + basePath + ") is created.");
- }
-
- this.storeManager = new StorageManager(conf);
-
- // Get the tajo data warehouse dir
- this.wareHousePath = new Path(basePath, TajoConstants.WAREHOUSE_DIR);
- LOG.info("Tajo Warehouse dir is set to " + wareHousePath);
- if (!defaultFS.exists(wareHousePath)) {
- defaultFS.mkdirs(wareHousePath);
- LOG.info("Warehouse dir (" + wareHousePath + ") is created");
- }
-
- yarnRPC = YarnRPC.create(conf);
-
- this.dispatcher = new AsyncDispatcher();
- addIfService(dispatcher);
-
- // The below is some mode-dependent codes
- // If tajo is local mode
- final boolean mode = conf.getBoolVar(ConfVars.CLUSTER_DISTRIBUTED);
- if (!mode) {
- LOG.info("Enabled Pseudo Distributed Mode");
- } else { // if tajo is distributed mode
- LOG.info("Enabled Distributed Mode");
- }
- // This is temporal solution of the above problem.
- catalogServer = new CatalogServer(initBuiltinFunctions());
- addIfService(catalogServer);
- catalog = new LocalCatalog(catalogServer);
-
- globalEngine = new GlobalEngine(context, storeManager);
- addIfService(globalEngine);
-
- dispatcher.register(QueryEventType.class, new QueryEventDispatcher());
-
- clientService = new ClientService(context);
- addIfService(clientService);
-
- RackResolver.init(conf);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- super.init(conf);
- }
-
- @SuppressWarnings("unchecked")
- public static List<FunctionDesc> initBuiltinFunctions() throws ServiceException {
- List<FunctionDesc> sqlFuncs = new ArrayList<FunctionDesc>();
-
- // Sum
- sqlFuncs.add(new FunctionDesc("sum", SumInt.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT4),
- CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
- sqlFuncs.add(new FunctionDesc("sum", SumLong.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT8),
- CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
- sqlFuncs.add(new FunctionDesc("sum", SumFloat.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
- sqlFuncs.add(new FunctionDesc("sum", SumDouble.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
-
- // Max
- sqlFuncs.add(new FunctionDesc("max", MaxInt.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT4),
- CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
- sqlFuncs.add(new FunctionDesc("max", MaxLong.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT8),
- CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
- sqlFuncs.add(new FunctionDesc("max", MaxFloat.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
- sqlFuncs.add(new FunctionDesc("max", MaxDouble.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
-
- // Min
- sqlFuncs.add(new FunctionDesc("min", MinInt.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT4),
- CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
- sqlFuncs.add(new FunctionDesc("min", MinLong.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT8),
- CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
- sqlFuncs.add(new FunctionDesc("min", MinFloat.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4 )));
- sqlFuncs.add(new FunctionDesc("min", MinDouble.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
- sqlFuncs.add(new FunctionDesc("min", MinString.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
- CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
-
- // AVG
- sqlFuncs.add(new FunctionDesc("avg", AvgInt.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
- CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
- sqlFuncs.add(new FunctionDesc("avg", AvgLong.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
- CatalogUtil.newDataTypesWithoutLen(Type.INT8)));
- sqlFuncs.add(new FunctionDesc("avg", AvgFloat.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT4)));
- sqlFuncs.add(new FunctionDesc("avg", AvgDouble.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8),
- CatalogUtil.newDataTypesWithoutLen(Type.FLOAT8)));
-
- // Count
- sqlFuncs.add(new FunctionDesc("count", CountValue.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT8),
- CatalogUtil.newDataTypesWithoutLen(Type.ANY)));
- sqlFuncs.add(new FunctionDesc("count", CountRows.class, FunctionType.AGGREGATION,
- CatalogUtil.newDataTypesWithoutLen(Type.INT8),
- CatalogUtil.newDataTypesWithoutLen()));
-
- // GeoIP
- sqlFuncs.add(new FunctionDesc("in_country", InCountry.class, FunctionType.GENERAL,
- CatalogUtil.newDataTypesWithoutLen(Type.BOOLEAN),
- CatalogUtil.newDataTypesWithoutLen(Type.TEXT, Type.TEXT)));
- sqlFuncs.add(new FunctionDesc("country", Country.class, FunctionType.GENERAL,
- CatalogUtil.newDataTypesWithoutLen(Type.TEXT),
- CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
-
- // Date
- sqlFuncs.add(new FunctionDesc("date", Date.class, FunctionType.GENERAL,
- CatalogUtil.newDataTypesWithoutLen(Type.INT8),
- CatalogUtil.newDataTypesWithoutLen(Type.TEXT)));
-
- // Today
- sqlFuncs.add(new FunctionDesc("today", Date.class, FunctionType.GENERAL,
- CatalogUtil.newDataTypesWithoutLen(Type.INT8),
- CatalogUtil.newDataTypesWithoutLen()));
-
- sqlFuncs.add(
- new FunctionDesc("random", RandomInt.class, FunctionType.GENERAL,
- CatalogUtil.newDataTypesWithoutLen(Type.INT4),
- CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-
- return sqlFuncs;
- }
-
- public MasterContext getContext() {
- return this.context;
- }
-
- protected void addIfService(Object object) {
- if (object instanceof Service) {
- addService((Service) object);
- }
- }
-
- @Override
- public void start() {
- LOG.info("TajoMaster startup");
- super.start();
- }
-
- @Override
- public void stop() {
- try {
- webServer.stop();
- } catch (Exception e) {
- LOG.error(e);
- }
-
- super.stop();
- LOG.info("TajoMaster main thread exiting");
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- public String getMasterServerName() {
- return null;
- }
-
- public boolean isMasterRunning() {
- return getServiceState() == STATE.STARTED;
- }
-
- public CatalogService getCatalog() {
- return this.catalog;
- }
-
- public StorageManager getStorageManager() {
- return this.storeManager;
- }
-
- // TODO - to be improved
- public Collection<TaskStatusProto> getProgressQueries() {
- return null;
- }
-
- private class QueryEventDispatcher implements EventHandler<QueryEvent> {
- @Override
- public void handle(QueryEvent queryEvent) {
- LOG.info("QueryEvent: " + queryEvent.getQueryId());
- LOG.info("Found: " + context.getQuery(queryEvent.getQueryId()).getContext().getQueryId());
- context.getQuery(queryEvent.getQueryId()).handle(queryEvent);
- }
- }
-
- public static void main(String[] args) throws Exception {
- StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
-
- try {
- TajoMaster master = new TajoMaster();
- ShutdownHookManager.get().addShutdownHook(
- new CompositeServiceShutdownHook(master),
- SHUTDOWN_HOOK_PRIORITY);
- TajoConf conf = new TajoConf(new YarnConfiguration());
- master.init(conf);
- master.start();
- } catch (Throwable t) {
- LOG.fatal("Error starting JobHistoryServer", t);
- System.exit(-1);
- }
- }
-
- public class MasterContext {
- private final Map<QueryId, QueryMaster> queries = Maps.newConcurrentMap();
- private final TajoConf conf;
-
- public MasterContext(TajoConf conf) {
- this.conf = conf;
- }
-
- public TajoConf getConf() {
- return conf;
- }
-
- public Clock getClock() {
- return clock;
- }
-
- public QueryMaster getQuery(QueryId queryId) {
- return queries.get(queryId);
- }
-
- public Map<QueryId, QueryMaster> getAllQueries() {
- return queries;
- }
-
- public AsyncDispatcher getDispatcher() {
- return dispatcher;
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- public CatalogService getCatalog() {
- return catalog;
- }
-
- public GlobalEngine getGlobalEngine() {
- return globalEngine;
- }
-
- public StorageManager getStorageManager() {
- return storeManager;
- }
-
- public YarnRPC getYarnRPC() {
- return yarnRPC;
- }
-
- public ClientService getClientService() {
- return clientService;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
deleted file mode 100644
index d1cdbc2..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerGroupEvent.EventType;
-
-import java.util.Collection;
-
-public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
- public enum EventType {
- CONTAINER_REMOTE_LAUNCH,
- CONTAINER_REMOTE_CLEANUP
- }
-
- protected final SubQueryId subQueryId;
- protected final Collection<Container> containers;
- public TaskRunnerGroupEvent(EventType eventType,
- SubQueryId subQueryId,
- Collection<Container> containers) {
- super(eventType);
- this.subQueryId = subQueryId;
- this.containers = containers;
- }
-
- public Collection<Container> getContainers() {
- return containers;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
deleted file mode 100644
index 6b73d00..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-
-public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
deleted file mode 100644
index 2279d1b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import org.apache.hadoop.yarn.util.Records;
-import tajo.QueryConf;
-import tajo.SubQueryId;
-import tajo.conf.TajoConf;
-import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerGroupEvent.EventType;
-import tajo.master.event.QueryEvent;
-import tajo.master.event.QueryEventType;
-import tajo.pullserver.PullServerAuxService;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
-
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(TaskRunnerLauncherImpl.class);
- private final YarnRPC yarnRPC;
- private final static RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
- private QueryContext context;
- private final String taskListenerHost;
- private final int taskListenerPort;
-
- // For ContainerLauncherSpec
- private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
- private static String initialClasspath = null;
- private static final Object classpathLock = new Object();
- private Object commonContainerSpecLock = new Object();
- private ContainerLaunchContext commonContainerSpec = null;
-
- final public static FsPermission QUERYCONF_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
- /** for launching TaskRunners in parallel */
- private final ExecutorService executorService;
-
- public TaskRunnerLauncherImpl(QueryContext context) {
- super(TaskRunnerLauncherImpl.class.getName());
- this.context = context;
- taskListenerHost = context.getTaskListener().getHostName();
- taskListenerPort = context.getTaskListener().getPort();
- yarnRPC = context.getYarnRPC();
- executorService = Executors.newFixedThreadPool(
- context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
- }
-
- public void start() {
- super.start();
- }
-
- public void stop() {
- executorService.shutdown();
- super.stop();
- }
-
- @Override
- public void handle(TaskRunnerGroupEvent event) {
- if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
- launchTaskRunners(event.subQueryId, event.getContainers());
- } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
- killTaskRunners(event.getContainers());
- }
- }
-
- private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
- for (Container container : containers) {
- final ContainerProxy proxy = new ContainerProxy(container, subQueryId);
- executorService.submit(new LaunchRunner(proxy));
- }
- }
-
- private class LaunchRunner implements Runnable {
- private final ContainerProxy proxy;
- public LaunchRunner(ContainerProxy proxy) {
- this.proxy = proxy;
- }
- @Override
- public void run() {
- proxy.launch();
- }
- }
-
- private void killTaskRunners(Collection<Container> containers) {
- for (Container container : containers) {
- final ContainerProxy proxy = context.getContainer(container.getId());
- executorService.submit(new KillRunner(proxy));
- }
- }
-
- private class KillRunner implements Runnable {
- private final ContainerProxy proxy;
- public KillRunner(ContainerProxy proxy) {
- this.proxy = proxy;
- }
-
- @Override
- public void run() {
- proxy.kill();
- }
- }
-
-
- /**
- * Lock this on initialClasspath so that there is only one fork in the AM for
- * getting the initial class-path. TODO: We already construct
- * a parent CLC and use it for all the containers, so this should go away
- * once the mr-generated-classpath stuff is gone.
- */
- private static String getInitialClasspath(Configuration conf) {
- synchronized (classpathLock) {
- if (initialClasspathFlag.get()) {
- return initialClasspath;
- }
- Map<String, String> env = new HashMap<String, String>();
-
- initialClasspath = env.get(Environment.CLASSPATH.name());
- initialClasspathFlag.set(true);
- return initialClasspath;
- }
- }
-
- private ContainerLaunchContext createCommonContainerLaunchContext() {
- TajoConf conf = (TajoConf) getConfig();
-
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- try {
- ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the env variables to be setup
- ////////////////////////////////////////////////////////////////////////////
- LOG.info("Set the environment for the application master");
-
- Map<String, String> environment = new HashMap<String, String>();
- //String initialClassPath = getInitialClasspath(conf);
- environment.put(Environment.SHELL.name(), "/bin/bash");
- environment.put(Environment.JAVA_HOME.name(), System.getenv(Environment.JAVA_HOME.name()));
-
- // TODO - to be improved with tajo.sh shell script
- Properties prop = System.getProperties();
- if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE")) {
- environment.put(Environment.CLASSPATH.name(), prop.getProperty(
- "java.class.path", null));
- } else {
- // Add AppMaster.jar location to classpath
- // At some point we should not be required to add
- // the hadoop specific classpaths to the env.
- // It should be provided out of the box.
- // For now setting all required classpaths including
- // the classpath to "." for the application jar
- StringBuilder classPathEnv = new StringBuilder("./");
- //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
- for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
- classPathEnv.append(':');
- classPathEnv.append(c.trim());
- }
-
- classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
- classPathEnv.append(":./log4j.properties:./*");
- environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
- environment.put(
- Environment.HADOOP_COMMON_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put(
- Environment.HADOOP_HDFS_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put(
- Environment.HADOOP_YARN_HOME.name(),
- System.getenv("HADOOP_HOME"));
- environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
- environment.put(Environment.CLASSPATH.name(), classPathEnv.toString());
- }
-
- ctx.setEnvironment(environment);
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- FileSystem fs = null;
-
-
- LOG.info("defaultFS: " + conf.get("fs.default.name"));
- LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
- try {
- fs = FileSystem.get(conf);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- FileContext fsCtx = null;
- try {
- fsCtx = FileContext.getFileContext(getConfig());
- } catch (UnsupportedFileSystemException e) {
- e.printStackTrace();
- }
-
- LOG.info("Writing a QueryConf to HDFS and add to local environment");
- Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
- try {
- writeConf(conf, queryConfPath);
-
- LocalResource queryConfSrc = createApplicationResource(fsCtx,
- queryConfPath, LocalResourceType.FILE);
- localResources.put(QueryConf.FILENAME, queryConfSrc);
-
- ctx.setLocalResources(localResources);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- // Add shuffle token
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
- try {
- //LOG.info("Putting shuffle token in serviceData");
- serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID,
- PullServerAuxService.serializeMetaData(0));
- } catch (IOException ioe) {
- LOG.error(ioe);
- }
- ctx.setServiceData(serviceData);
-
- return ctx;
- }
-
- protected ContainerManager getCMProxy(ContainerId containerID,
- final String containerManagerBindAddr,
- ContainerToken containerToken)
- throws IOException {
- String [] hosts = containerManagerBindAddr.split(":");
- final InetSocketAddress cmAddr =
- new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- Token<ContainerTokenIdentifier> token =
- ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
- // the user in createRemoteUser in this context has to be ContainerID
- user = UserGroupInformation.createRemoteUser(containerID.toString());
- user.addToken(token);
- }
-
- ContainerManager proxy = user
- .doAs(new PrivilegedAction<ContainerManager>() {
- @Override
- public ContainerManager run() {
- return (ContainerManager) yarnRPC.getProxy(ContainerManager.class,
- cmAddr, getConfig());
- }
- });
- return proxy;
- }
-
- private LocalResource createApplicationResource(FileContext fs,
- Path p, LocalResourceType type)
- throws IOException {
- LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
- FileStatus rsrcStat = fs.getFileStatus(p);
- rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
- .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
- rsrc.setSize(rsrcStat.getLen());
- rsrc.setTimestamp(rsrcStat.getModificationTime());
- rsrc.setType(type);
- rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- return rsrc;
- }
-
- private void writeConf(Configuration conf, Path queryConfFile)
- throws IOException {
- // Write job file to Tajo's fs
- FileSystem fs = queryConfFile.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, queryConfFile,
- new FsPermission(QUERYCONF_FILE_PERMISSION));
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
- private static enum ContainerState {
- PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
- }
-
- public class ContainerProxy {
- private ContainerState state;
- // store enough information to be able to cleanup the container
- private Container container;
- private ContainerId containerID;
- final private String containerMgrAddress;
- private ContainerToken containerToken;
- private String hostName;
- private int port = -1;
- private final SubQueryId subQueryId;
-
- public ContainerProxy(Container container, SubQueryId subQueryId) {
- this.state = ContainerState.PREP;
- this.container = container;
- this.containerID = container.getId();
- NodeId nodeId = container.getNodeId();
- this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();;
- this.containerToken = container.getContainerToken();
- this.subQueryId = subQueryId;
- }
-
- public synchronized boolean isCompletelyDone() {
- return state == ContainerState.DONE || state == ContainerState.FAILED;
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void launch() {
- LOG.info("Launching Container with Id: " + containerID);
- if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
- state = ContainerState.DONE;
- LOG.error("Container (" + containerID + " was killed before it was launched");
- return;
- }
-
- ContainerManager proxy = null;
- try {
-
- proxy = getCMProxy(containerID, containerMgrAddress,
- containerToken);
-
- // Construct the actual Container
- ContainerLaunchContext containerLaunchContext = createContainerLaunchContext();
-
- // Now launch the actual container
- StartContainerRequest startRequest = Records
- .newRecord(StartContainerRequest.class);
- startRequest.setContainerLaunchContext(containerLaunchContext);
- StartContainerResponse response = proxy.startContainer(startRequest);
-
- ByteBuffer portInfo = response
- .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
-
- if(portInfo != null) {
- port = PullServerAuxService.deserializeMetaData(portInfo);
- }
-
- LOG.info("PullServer port returned by ContainerManager for "
- + containerID + " : " + port);
-
- if(port < 0) {
- this.state = ContainerState.FAILED;
- throw new IllegalStateException("Invalid shuffle port number "
- + port + " returned for " + containerID);
- }
-
- // after launching, send launched event to task attempt to move
- // it from ASSIGNED to RUNNING state
-// context.getEventHandler().handle(new AMContainerEventLaunched(containerID, port));
-
- // this is workaround code
- context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
-
- this.state = ContainerState.RUNNING;
- this.hostName = containerMgrAddress.split(":")[0];
- context.addContainer(containerID, this);
- } catch (Throwable t) {
- String message = "Container launch failed for " + containerID + " : "
- + StringUtils.stringifyException(t);
- this.state = ContainerState.FAILED;
- LOG.error(message);
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, getConfig());
- }
- }
- }
-
- public synchronized void kill() {
-
- if(isCompletelyDone()) {
- return;
- }
- if(this.state == ContainerState.PREP) {
- this.state = ContainerState.KILLED_BEFORE_LAUNCH;
- } else {
- LOG.info("KILLING " + containerID);
-
- ContainerManager proxy = null;
- try {
- proxy = getCMProxy(this.containerID, this.containerMgrAddress,
- this.containerToken);
-
- // kill the remote container if already launched
- StopContainerRequest stopRequest = Records
- .newRecord(StopContainerRequest.class);
- stopRequest.setContainerId(this.containerID);
- proxy.stopContainer(stopRequest);
- // If stopContainer returns without an error, assuming the stop made
- // it over to the NodeManager.
-// context.getEventHandler().handle(
-// new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
- context.removeContainer(containerID);
- } catch (Throwable t) {
-
- // ignore the cleanup failure
- String message = "cleanup failed for container "
- + this.containerID + " : "
- + StringUtils.stringifyException(t);
-// context.getEventHandler().handle(
-// new AMContainerEventStopFailed(containerID, message));
- LOG.warn(message);
- this.state = ContainerState.DONE;
- return;
- } finally {
- if (proxy != null) {
- yarnRPC.stopProxy(proxy, getConfig());
- }
- }
- this.state = ContainerState.DONE;
- }
- }
-
- public ContainerLaunchContext createContainerLaunchContext() {
- synchronized (commonContainerSpecLock) {
- if (commonContainerSpec == null) {
- commonContainerSpec = createCommonContainerLaunchContext();
- }
- }
-
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerSpec.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
-
- // Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
- for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
- .getServiceData().entrySet()) {
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- // Set the necessary command to execute the application master
- Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
- // Set java executable command
- //LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
- // Set Xmx based on am memory size
- vargs.add("-Xmx2000m");
- // Set Remote Debugging
- //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
- //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
- //}
- // Set class name
- vargs.add("tajo.worker.TaskRunner");
- vargs.add(taskListenerHost); // tasklistener hostname
- vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
- vargs.add(subQueryId.toString()); // subqueryId
- vargs.add(containerMgrAddress); // nodeId
- vargs.add(containerID.toString()); // containerId
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- LOG.info("Completed setting up TaskRunner command " + command.toString());
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- return BuilderUtils.newContainerLaunchContext(containerID, commonContainerSpec.getUser(),
- container.getResource(), commonContainerSpec.getLocalResources(), myEnv, commands,
- myServiceData, null, new HashMap<ApplicationAccessType, String>());
- }
-
- public String getHostName() {
- return this.hostName;
- }
-
- public int getPullServerPort() {
- return this.port;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java
deleted file mode 100644
index 661b458..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerListener.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import tajo.QueryUnitAttemptId;
-import tajo.TajoIdProtos.QueryUnitAttemptIdProto;
-import tajo.conf.TajoConf;
-import tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-import tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import tajo.ipc.MasterWorkerProtocol;
-import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import tajo.master.QueryMaster.QueryContext;
-import tajo.master.event.TaskAttemptStatusUpdateEvent;
-import tajo.master.event.TaskCompletionEvent;
-import tajo.master.event.TaskFatalErrorEvent;
-import tajo.master.event.TaskRequestEvent;
-import tajo.rpc.ProtoAsyncRpcServer;
-import tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-public class TaskRunnerListener extends AbstractService
- implements MasterWorkerProtocolService.Interface {
-
- private final static Log LOG = LogFactory.getLog(
- tajo.master.cluster.WorkerListener.class);
- private QueryContext context;
- private ProtoAsyncRpcServer rpcServer;
- private InetSocketAddress bindAddr;
- private String addr;
-
- public TaskRunnerListener(final QueryContext context) throws Exception {
- super(tajo.master.cluster.WorkerListener.class.getName());
- this.context = context;
-
-
- InetSocketAddress initIsa =
- new InetSocketAddress(InetAddress.getLocalHost(), 0);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
- try {
- this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
- this, initIsa);
- } catch (Exception e) {
- LOG.error(e);
- }
- this.rpcServer.start();
- this.bindAddr = rpcServer.getBindAddress();
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
- }
-
- @Override
- public void init(Configuration conf) {
- // Setup RPC server
- try {
- InetSocketAddress initIsa =
- new InetSocketAddress(InetAddress.getLocalHost(), 0);
- if (initIsa.getAddress() == null) {
- throw new IllegalArgumentException("Failed resolve of " + initIsa);
- }
-
- this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
- this, initIsa);
-
- this.rpcServer.start();
- this.bindAddr = rpcServer.getBindAddress();
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
- } catch (Exception e) {
- LOG.error(e);
- }
-
- // Get the master address
- LOG.info(tajo.master.cluster.WorkerListener.class.getSimpleName() + " is bind to " + addr);
- context.getConf().setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
-
- super.init(conf);
- }
-
- @Override
- public void start() {
-
-
- super.start();
- }
-
- @Override
- public void stop() {
- rpcServer.shutdown();
- super.stop();
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddr;
- }
-
- public String getAddress() {
- return this.addr;
- }
-
- static BoolProto TRUE_PROTO = BoolProto.newBuilder().setValue(true).build();
-
- @Override
- public void getTask(RpcController controller, ContainerIdProto request,
- RpcCallback<QueryUnitRequestProto> done) {
- context.getEventHandler().handle(new TaskRequestEvent(
- new ContainerIdPBImpl(request), done));
- }
-
- @Override
- public void statusUpdate(RpcController controller, TaskStatusProto request,
- RpcCallback<BoolProto> done) {
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
- context.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId,
- request));
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void ping(RpcController controller,
- QueryUnitAttemptIdProto attemptIdProto,
- RpcCallback<BoolProto> done) {
- // TODO - to be completed
- QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-// context.getQuery(attemptId.getQueryId()).getSubQuery(attemptId.getSubQueryId()).
-// getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
-// resetExpireTime();
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void fatalError(RpcController controller, TaskFatalErrorReport report,
- RpcCallback<BoolProto> done) {
- context.getEventHandler().handle(new TaskFatalErrorEvent(report));
- done.run(TRUE_PROTO);
- }
-
- @Override
- public void done(RpcController controller, TaskCompletionReport report,
- RpcCallback<BoolProto> done) {
- context.getEventHandler().handle(new TaskCompletionEvent(report));
- done.run(TRUE_PROTO);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java
deleted file mode 100644
index e4218f0..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskScheduler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import tajo.master.event.TaskSchedulerEvent;
-
-public interface TaskScheduler extends EventHandler<TaskSchedulerEvent> {
-
-}