You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:01 UTC
[3/8] TAJO-91: Launch QueryMaster on NodeManager per query.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
new file mode 100644
index 0000000..5443858
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.ipc.QueryMasterProtocol.Partition;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskCompletionReport;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
+
+ private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
+
+ private final static int EXPIRE_TIME = 15000;
+
+ private final QueryUnitAttemptId id;
+ private final QueryUnit queryUnit;
+ final EventHandler eventHandler;
+
+ private String hostName;
+ private int port;
+ private int expire;
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ private static final StateMachineFactory
+ <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachineFactory = new StateMachineFactory
+ <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ (TaskAttemptState.TA_NEW)
+
+ .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+ TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
+ .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+ TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+
+ .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
+ TaskAttemptEventType.TA_ASSIGNED, new LaunchTransition())
+
+ // from assigned
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
+ TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED,
+ EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
+ TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_DONE, new SucceededTransition())
+
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+ // from running
+ .addTransition(TaskAttemptState.TA_RUNNING,
+ EnumSet.of(TaskAttemptState.TA_RUNNING),
+ TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+
+ .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_DONE, new SucceededTransition())
+
+ .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_UPDATE)
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+ .installTopology();
+
+ private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+ stateMachine;
+
+
+ public QueryUnitAttempt(final QueryUnitAttemptId id, final QueryUnit queryUnit,
+ final EventHandler eventHandler) {
+ this.id = id;
+ this.expire = QueryUnitAttempt.EXPIRE_TIME;
+ this.queryUnit = queryUnit;
+ this.eventHandler = eventHandler;
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ public TaskAttemptState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public QueryUnitAttemptId getId() {
+ return this.id;
+ }
+
+ public boolean isLeafTask() {
+ return this.queryUnit.isLeafTask();
+ }
+
+ public QueryUnit getQueryUnit() {
+ return this.queryUnit;
+ }
+
+ public String getHost() {
+ return this.hostName;
+ }
+
+ public void setHost(String host) {
+ this.hostName = host;
+ }
+
+ public void setPullServerPort(int port) {
+ this.port = port;
+ }
+
+ public int getPullServerPort() {
+ return port;
+ }
+
+ public synchronized void setExpireTime(int expire) {
+ this.expire = expire;
+ }
+
+ public synchronized void updateExpireTime(int period) {
+ this.setExpireTime(this.expire - period);
+ }
+
+ public synchronized void resetExpireTime() {
+ this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME);
+ }
+
+ public int getLeftTime() {
+ return this.expire;
+ }
+
+ private void fillTaskStatistics(TaskCompletionReport report) {
+ if (report.getPartitionsCount() > 0) {
+ this.getQueryUnit().setPartitions(report.getPartitionsList());
+
+ List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+ for (Partition p : report.getPartitionsList()) {
+ IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
+ getId().getId(), p.getPartitionKey(), getHost(), getPullServerPort());
+ partitions.add(entry);
+ }
+ this.getQueryUnit().setIntermediateData(partitions);
+ }
+ if (report.hasResultStats()) {
+ this.getQueryUnit().setStats(new TableStat(report.getResultStats()));
+ }
+ }
+
+ private static class TaskAttemptScheduleTransition implements
+ SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(QueryUnitAttempt taskAttempt,
+ TaskAttemptEvent taskAttemptEvent) {
+
+ if (taskAttempt.isLeafTask()
+ && taskAttempt.getQueryUnit().getScanNodes().length == 1) {
+ Set<String> racks = new HashSet<String>();
+ for (String host : taskAttempt.getQueryUnit().getDataLocations()) {
+ racks.add(RackResolver.resolve(host).getNetworkLocation());
+ }
+
+ taskAttempt.eventHandler.handle(new TaskScheduleEvent(
+ taskAttempt.getId(), EventType.T_SCHEDULE, true,
+ taskAttempt.getQueryUnit().getDataLocations(),
+ racks.toArray(new String[racks.size()])
+ ));
+ } else {
+ taskAttempt.eventHandler.handle(new TaskScheduleEvent(
+ taskAttempt.getId(), EventType.T_SCHEDULE,
+ false,
+ null,
+ null
+ ));
+ }
+ }
+ }
+
+ private static class LaunchTransition
+ implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(QueryUnitAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+ taskAttempt.setHost(castEvent.getHostName());
+ taskAttempt.setPullServerPort(castEvent.getPullServerPort());
+ taskAttempt.eventHandler.handle(
+ new TaskTAttemptEvent(taskAttempt.getId(),
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+ }
+ }
+
+ private static class StatusUpdateTransition
+ implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
+
+ @Override
+ public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ TaskAttemptStatusUpdateEvent updateEvent =
+ (TaskAttemptStatusUpdateEvent) event;
+
+ switch (updateEvent.getStatus().getState()) {
+ case TA_PENDING:
+ case TA_RUNNING:
+ return TaskAttemptState.TA_RUNNING;
+
+ default:
+ return taskAttempt.getState();
+ }
+ }
+ }
+
+ private void addDiagnosticInfo(String diag) {
+ if (diag != null && !diag.equals("")) {
+ diagnostics.add(diag);
+ }
+ }
+
+ private static class AlreadyAssignedTransition
+ implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+
+ @Override
+ public void transition(QueryUnitAttempt queryUnitAttempt,
+ TaskAttemptEvent taskAttemptEvent) {
+ LOG.info(">>>>>>>>> Already Assigned: " + queryUnitAttempt.getId());
+ }
+ }
+
+ private static class AlreadyDoneTransition
+ implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+
+ @Override
+ public void transition(QueryUnitAttempt queryUnitAttempt,
+ TaskAttemptEvent taskAttemptEvent) {
+ LOG.info(">>>>>>>>> Already Done: " + queryUnitAttempt.getId());
+ }
+ }
+
+ private static class SucceededTransition
+ implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+ @Override
+ public void transition(QueryUnitAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
+
+ taskAttempt.fillTaskStatistics(report);
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ }
+ }
+
+ private static class FailedTransition
+ implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+ @Override
+ public void transition(QueryUnitAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
+ taskAttempt.eventHandler.handle(
+ new TaskTAttemptEvent(taskAttempt.getId(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ LOG.error("FROM " + taskAttempt.getHost() + " >> "
+ + errorEvent.errorMessage());
+ taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
+ }
+ }
+
+ @Override
+ public void handle(TaskAttemptEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getTaskAttemptId() + " of type "
+ + event.getType());
+ }
+ try {
+ writeLock.lock();
+ TaskAttemptState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state of "
+ + event.getTaskAttemptId() + ")", e);
+ eventHandler.handle(new QueryEvent(getId().getQueryId(),
+ QueryEventType.INTERNAL_ERROR));
+ }
+
+ //notify the eventhandler of state change
+ if (LOG.isDebugEnabled()) {
+ if (oldState != getState()) {
+ LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
+ + getState());
+ }
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
new file mode 100644
index 0000000..3957d57
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
+import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * Repartitioner creates non-leaf tasks and shuffles intermediate data.
+ * It supports two repartition methods, such as hash and range repartition.
+ */
+public class Repartitioner {
+ private static final Log LOG = LogFactory.getLog(Repartitioner.class);
+
+ private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
+
+ public static QueryUnit[] createJoinTasks(SubQuery subQuery)
+ throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ //CatalogService catalog = subQuery.getContext().getCatalog();
+
+ ScanNode[] scans = execBlock.getScanNodes();
+ Path tablePath;
+ Fragment [] fragments = new Fragment[2];
+ TableStat [] stats = new TableStat[2];
+
+ // initialize variables from the child operators
+ for (int i =0; i < 2; i++) {
+ // TODO - temporarily tables should be stored in temporarily catalog for each query
+ TableDesc tableDesc = subQuery.getContext().getTableDescMap().get(scans[i].getFromTable().getTableName());
+ if (scans[i].getTableId().startsWith(SubQueryId.PREFIX)) {
+ tablePath = subQuery.getStorageManager().getTablePath(scans[i].getTableId());
+ } else {
+ tablePath = tableDesc.getPath();
+ }
+
+ if (scans[i].isLocal()) { // it only requires a dummy fragment.
+ fragments[i] = new Fragment(scans[i].getTableId(), tablePath,
+ CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV),
+ 0, 0, null);
+ } else {
+ fragments[i] = subQuery.getStorageManager().getSplits(scans[i].getTableId(),
+ tableDesc.getMeta(),
+ new Path(tablePath, "data")).get(0);
+ }
+
+ // Getting a table stat for each scan
+ stats[i] = subQuery.getChildQuery(scans[i]).getTableStat();
+ }
+
+ // Assigning either fragments or fetch urls to query units
+ QueryUnit [] tasks;
+ if (scans[0].isBroadcast() || scans[1].isBroadcast()) {
+ tasks = new QueryUnit[1];
+ tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
+ false, subQuery.getEventHandler());
+ tasks[0].setLogicalPlan(execBlock.getPlan());
+ tasks[0].setFragment(scans[0].getTableId(), fragments[0]);
+ tasks[0].setFragment(scans[1].getTableId(), fragments[1]);
+ } else {
+ // The hash map is modeling as follows:
+ // <Partition Id, <Table Name, Intermediate Data>>
+ Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries =
+ new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
+
+ // Grouping IntermediateData by a partition key and a table name
+ for (ScanNode scan : scans) {
+ SubQuery childSubQuery = subQuery.getChildQuery(scan);
+ for (QueryUnit task : childSubQuery.getQueryUnits()) {
+ if (task.getIntermediateData() != null) {
+ for (IntermediateEntry intermEntry : task.getIntermediateData()) {
+ if (hashEntries.containsKey(intermEntry.getPartitionId())) {
+ Map<String, List<IntermediateEntry>> tbNameToInterm =
+ hashEntries.get(intermEntry.getPartitionId());
+
+ if (tbNameToInterm.containsKey(scan.getTableId())) {
+ tbNameToInterm.get(scan.getTableId()).add(intermEntry);
+ } else {
+ tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
+ }
+ } else {
+ Map<String, List<IntermediateEntry>> tbNameToInterm =
+ new HashMap<String, List<IntermediateEntry>>();
+ tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
+ hashEntries.put(intermEntry.getPartitionId(), tbNameToInterm);
+ }
+ }
+ }
+ }
+ }
+
+ LOG.info("Outer Intermediate Volume: " + stats[0].getNumBytes());
+ LOG.info("Inner Intermediate Volume: " + stats[1].getNumBytes());
+
+ // Getting the desire number of join tasks according to the volumn
+ // of a larger table
+ int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
+ int desireJoinTaskVolumn = subQuery.getContext().getConf().
+ getIntVar(ConfVars.JOIN_TASK_VOLUME);
+
+ // calculate the number of tasks according to the data size
+ int mb = (int) Math.ceil((double)stats[largerIdx].getNumBytes() / 1048576);
+ LOG.info("Larger intermediate data is approximately " + mb + " MB");
+ // determine the number of task per 64MB
+ int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
+ LOG.info("The calculated number of tasks is " + maxTaskNum);
+ LOG.info("The number of total partition keys is " + hashEntries.size());
+ // the number of join tasks cannot be larger than the number of
+ // distinct partition ids.
+ int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
+ LOG.info("The determined number of join tasks is " + joinTaskNum);
+ QueryUnit [] createdTasks = newEmptyJoinTask(subQuery, fragments, joinTaskNum);
+
+ // Assign partitions to tasks in a round robin manner.
+ int i = 0;
+ for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
+ : hashEntries.entrySet()) {
+ addJoinPartition(createdTasks[i++], subQuery, entry.getKey(), entry.getValue());
+ if (i >= joinTaskNum) {
+ i = 0;
+ }
+ }
+
+ List<QueryUnit> filteredTasks = new ArrayList<QueryUnit>();
+ for (QueryUnit task : createdTasks) {
+ // if there are at least two fetches, the join is possible.
+ if (task.getFetches().size() > 1) {
+ filteredTasks.add(task);
+ }
+ }
+
+ tasks = filteredTasks.toArray(new QueryUnit[filteredTasks.size()]);
+ }
+
+ return tasks;
+ }
+
+ private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ QueryUnit [] tasks = new QueryUnit[taskNum];
+ for (int i = 0; i < taskNum; i++) {
+ tasks[i] = new QueryUnit(
+ QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
+ subQuery.getEventHandler());
+ tasks[i].setLogicalPlan(execBlock.getPlan());
+ for (Fragment fragment : fragments) {
+ tasks[i].setFragment2(fragment);
+ }
+ }
+
+ return tasks;
+ }
+
+ private static void addJoinPartition(QueryUnit task, SubQuery subQuery, int partitionId,
+ Map<String, List<IntermediateEntry>> grouppedPartitions) {
+
+ for (ScanNode scanNode : subQuery.getBlock().getScanNodes()) {
+ Map<String, List<IntermediateEntry>> requests;
+ if (grouppedPartitions.containsKey(scanNode.getTableId())) {
+ requests = mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
+ } else {
+ return;
+ }
+ Set<URI> fetchURIs = TUtil.newHashSet();
+ for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
+ Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
+ subQuery.getChildQuery(scanNode).getId(),
+ partitionId, PartitionType.HASH,
+ requestPerNode.getValue());
+ fetchURIs.addAll(uris);
+ }
+ task.addFetches(scanNode.getTableId(), fetchURIs);
+ }
+ }
+
+ /**
+ * This method merges the partition request associated with the pullserver's address.
+ * It reduces the number of TCP connections.
+ *
+ * @return key: pullserver's address, value: a list of requests
+ */
+ private static Map<String, List<IntermediateEntry>> mergeHashPartitionRequest(
+ List<IntermediateEntry> partitions) {
+ Map<String, List<IntermediateEntry>> mergedPartitions =
+ new HashMap<String, List<IntermediateEntry>>();
+ for (IntermediateEntry partition : partitions) {
+ if (mergedPartitions.containsKey(partition.getPullAddress())) {
+ mergedPartitions.get(partition.getPullAddress()).add(partition);
+ } else {
+ mergedPartitions.put(partition.getPullAddress(), TUtil.newList(partition));
+ }
+ }
+
+ return mergedPartitions;
+ }
+
+ public static QueryUnit [] createNonLeafTask(SubQuery subQuery,
+ SubQuery childSubQuery,
+ int maxNum)
+ throws InternalException {
+ ExecutionBlock childExecBlock = childSubQuery.getBlock();
+ if (childExecBlock.getPartitionType() == PartitionType.HASH) {
+ return createHashPartitionedTasks(subQuery, childSubQuery, maxNum);
+ } else if (childExecBlock.getPartitionType() == PartitionType.RANGE) {
+ return createRangePartitionedTasks(subQuery, childSubQuery, maxNum);
+ } else {
+ throw new InternalException("Cannot support partition type");
+ }
+ }
+
+ public static QueryUnit [] createRangePartitionedTasks(SubQuery subQuery,
+ SubQuery childSubQuery,
+ int maxNum)
+ throws InternalException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ TableStat stat = childSubQuery.getTableStat();
+ if (stat.getNumRows() == 0) {
+ return new QueryUnit[0];
+ }
+
+ ScanNode scan = execBlock.getScanNodes()[0];
+ Path tablePath;
+ tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
+
+ StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan();
+ SortNode sort = (SortNode) store.getSubNode();
+ SortSpec[] sortSpecs = sort.getSortKeys();
+ Schema sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
+
+ // calculate the number of maximum query ranges
+ TupleRange mergedRange =
+ TupleUtil.columnStatToRange(sort.getOutSchema(),
+ sortSchema, stat.getColumnStats());
+ RangePartitionAlgorithm partitioner =
+ new UniformRangePartition(sortSchema, mergedRange);
+ BigDecimal card = partitioner.getTotalCardinality();
+
+ // if the number of the range cardinality is less than the desired number of tasks,
+ // we set the the number of tasks to the number of range cardinality.
+ int determinedTaskNum;
+ if (card.compareTo(new BigDecimal(maxNum)) < 0) {
+ LOG.info("The range cardinality (" + card
+ + ") is less then the desired number of tasks (" + maxNum + ")");
+ determinedTaskNum = card.intValue();
+ } else {
+ determinedTaskNum = maxNum;
+ }
+
+ LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum +
+ " sub ranges (total units: " + determinedTaskNum + ")");
+ TupleRange [] ranges = partitioner.partition(determinedTaskNum);
+
+ Fragment dummyFragment = new Fragment(scan.getTableId(), tablePath,
+ CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
+ 0, 0, null);
+
+ List<String> basicFetchURIs = new ArrayList<String>();
+
+ SubQuery child = childSubQuery.getContext().getSubQuery(
+ subQuery.getBlock().getChildBlock(scan).getId());
+ for (QueryUnit qu : child.getQueryUnits()) {
+ for (IntermediateEntry p : qu.getIntermediateData()) {
+ String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(),
+ childSubQuery.getId(), p.taskId, p.attemptId);
+ basicFetchURIs.add(uri);
+ }
+ }
+
+ boolean ascendingFirstKey = sortSpecs[0].isAscending();
+ SortedMap<TupleRange, Set<URI>> map;
+ if (ascendingFirstKey) {
+ map = new TreeMap<TupleRange, Set<URI>>();
+ } else {
+ map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator());
+ }
+
+ Set<URI> uris;
+ try {
+ for (int i = 0; i < ranges.length; i++) {
+ uris = new HashSet<URI>();
+ for (String uri: basicFetchURIs) {
+ String rangeParam = TupleUtil.rangeToQuery(sortSchema, ranges[i],
+ ascendingFirstKey, ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
+ URI finalUri = URI.create(uri + "&" + rangeParam);
+ uris.add(finalUri);
+ }
+ map.put(ranges[i], uris);
+ }
+
+ } catch (UnsupportedEncodingException e) {
+ LOG.error(e);
+ }
+
+ QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, dummyFragment);
+ assignPartitionByRoundRobin(map, scan.getTableId(), tasks);
+ return tasks;
+ }
+
+ public static QueryUnit [] assignPartitionByRoundRobin(Map<?, Set<URI>> partitions,
+ String tableName, QueryUnit [] tasks) {
+ int tid = 0;
+ for (Entry<?, Set<URI>> entry : partitions.entrySet()) {
+ for (URI uri : entry.getValue()) {
+ tasks[tid].addFetch(tableName, uri);
+ }
+
+ if (tid >= tasks.length) {
+ tid = 0;
+ } else {
+ tid ++;
+ }
+ }
+
+ return tasks;
+ }
+
+ public static String createBasicFetchUri(String hostName, int port,
+ SubQueryId childSid,
+ int taskId, int attemptId) {
+ String scheme = "http://";
+ StringBuilder sb = new StringBuilder(scheme);
+ sb.append(hostName).append(":").append(port)
+ .append("/?").append("sid=").append(childSid.getId())
+ .append("&").append("ta=").append(taskId).append("_").append(attemptId)
+ .append("&").append("p=0")
+ .append("&").append("type=r");
+
+ return sb.toString();
+ }
+
+ public static QueryUnit [] createHashPartitionedTasks(SubQuery subQuery,
+ SubQuery childSubQuery,
+ int maxNum) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ TableStat stat = childSubQuery.getTableStat();
+ if (stat.getNumRows() == 0) {
+ return new QueryUnit[0];
+ }
+
+ ScanNode scan = execBlock.getScanNodes()[0];
+ Path tablePath;
+ tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
+
+ List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+ for (QueryUnit tasks : childSubQuery.getQueryUnits()) {
+ if (tasks.getIntermediateData() != null) {
+ partitions.addAll(tasks.getIntermediateData());
+ }
+ }
+
+ Fragment frag = new Fragment(scan.getTableId(), tablePath,
+ CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
+ 0, 0, null);
+
+ Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
+ Map<String, List<IntermediateEntry>> hashedByHost;
+ Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
+
+ for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
+ hashedByHost = hashByHost(interm.getValue());
+ for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
+ Collection<URI> uris = createHashFetchURL(e.getKey(), childSubQuery.getId(),
+ interm.getKey(),
+ childSubQuery.getBlock().getPartitionType(), e.getValue());
+
+ if (finalFetchURI.containsKey(interm.getKey())) {
+ finalFetchURI.get(interm.getKey()).addAll(uris);
+ } else {
+ finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
+ }
+ }
+ }
+
+ GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getStoreTableNode().
+ getSubNode();
+ // the number of tasks cannot exceed the number of merged fetch uris.
+ int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
+ if (groupby.getGroupingColumns().length == 0) {
+ determinedTaskNum = 1;
+ }
+
+ QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, frag);
+
+ int tid = 0;
+ for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
+ for (URI uri : entry.getValue()) {
+ tasks[tid].addFetch(scan.getTableId(), uri);
+ }
+
+ tid ++;
+
+ if (tid == tasks.length) {
+ tid = 0;
+ }
+ }
+
+ return tasks;
+ }
+
+ public static Collection<URI> createHashFetchURL(String hostAndPort, SubQueryId childSid,
+ int partitionId, PartitionType type,
+ List<IntermediateEntry> entries) {
+ String scheme = "http://";
+ StringBuilder urlPrefix = new StringBuilder(scheme);
+ urlPrefix.append(hostAndPort)
+ .append("/?").append("sid=").append(childSid.getId())
+ .append("&").append("p=").append(partitionId)
+ .append("&").append("type=");
+ if (type == PartitionType.HASH) {
+ urlPrefix.append("h");
+ } else if (type == PartitionType.RANGE) {
+ urlPrefix.append("r");
+ }
+ urlPrefix.append("&ta=");
+
+ // If the get request is longer than 2000 characters,
+ // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+ // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+ // The below code transforms a long request to multiple requests.
+ List<String> taskIdsParams = new ArrayList<String>();
+ boolean first = true;
+ StringBuilder taskIdListBuilder = new StringBuilder();
+ for (IntermediateEntry entry: entries) {
+ StringBuilder taskAttemptId = new StringBuilder();
+
+ if (!first) { // when comma is added?
+ taskAttemptId.append(",");
+ } else {
+ first = false;
+ }
+
+ taskAttemptId.append(entry.getTaskId()).append("_").
+ append(entry.getAttemptId());
+ if (taskIdListBuilder.length() + taskAttemptId.length()
+ > HTTP_REQUEST_MAXIMUM_LENGTH) {
+ taskIdsParams.add(taskIdListBuilder.toString());
+ taskIdListBuilder = new StringBuilder(entry.getTaskId() + "_" + entry.getAttemptId());
+ } else {
+ taskIdListBuilder.append(taskAttemptId);
+ }
+ }
+
+ // if the url params remain
+ if (taskIdListBuilder.length() > 0) {
+ taskIdsParams.add(taskIdListBuilder.toString());
+ }
+
+ Collection<URI> fetchURLs = new ArrayList<URI>();
+ for (String param : taskIdsParams) {
+ fetchURLs.add(URI.create(urlPrefix + param));
+ }
+
+ return fetchURLs;
+ }
+
+ public static Map<Integer, List<IntermediateEntry>> hashByKey(
+ List<IntermediateEntry> entries) {
+ Map<Integer, List<IntermediateEntry>> hashed = new HashMap<Integer, List<IntermediateEntry>>();
+ for (IntermediateEntry entry : entries) {
+ if (hashed.containsKey(entry.getPartitionId())) {
+ hashed.get(entry.getPartitionId()).add(entry);
+ } else {
+ hashed.put(entry.getPartitionId(), TUtil.newList(entry));
+ }
+ }
+
+ return hashed;
+ }
+
+ public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
+ Fragment frag) {
+ LogicalNode plan = subQuery.getBlock().getPlan();
+ QueryUnit [] tasks = new QueryUnit[num];
+ for (int i = 0; i < num; i++) {
+ tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
+ false, subQuery.getEventHandler());
+ tasks[i].setFragment2(frag);
+ tasks[i].setLogicalPlan(plan);
+ }
+ return tasks;
+ }
+
+ public static Map<String, List<IntermediateEntry>> hashByHost(
+ List<IntermediateEntry> entries) {
+ Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>();
+
+ String hostName;
+ for (IntermediateEntry entry : entries) {
+ hostName = entry.getPullHost() + ":" + entry.getPullPort();
+ if (hashed.containsKey(hostName)) {
+ hashed.get(hostName).add(entry);
+ } else {
+ hashed.put(hostName, TUtil.newList(entry));
+ }
+ }
+
+ return hashed;
+ }
+
+ public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ Column[] keys = null;
+ // if the next query is join,
+ // set the partition number for the current logicalUnit
+ // TODO: the union handling is required when a join has unions as its child
+ ExecutionBlock parentBlock = execBlock.getParentBlock();
+ if (parentBlock != null) {
+ if (parentBlock.getStoreTableNode().getSubNode().getType() == ExprType.JOIN) {
+ execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
+ execBlock.getStoreTableNode().getPartitionKeys(), n);
+ keys = execBlock.getStoreTableNode().getPartitionKeys();
+ }
+ }
+
+ StoreTableNode store = execBlock.getStoreTableNode();
+ // set the partition number for group by and sort
+ if (execBlock.getPartitionType() == PartitionType.HASH) {
+ if (store.getSubNode().getType() == ExprType.GROUP_BY) {
+ GroupbyNode groupby = (GroupbyNode)store.getSubNode();
+ keys = groupby.getGroupingColumns();
+ }
+ } else if (execBlock.getPartitionType() == PartitionType.RANGE) {
+ if (store.getSubNode().getType() == ExprType.SORT) {
+ SortNode sort = (SortNode)store.getSubNode();
+ keys = new Column[sort.getSortKeys().length];
+ for (int i = 0; i < keys.length; i++) {
+ keys[i] = sort.getSortKeys()[i].getSortKey();
+ }
+ }
+ }
+ if (keys != null) {
+ if (keys.length == 0) {
+ store.setPartitions(execBlock.getPartitionType(), new Column[]{}, 1);
+ } else {
+ store.setPartitions(execBlock.getPartitionType(), keys, n);
+ }
+ } else {
+ store.setListPartition();
+ }
+ return subQuery;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
new file mode 100644
index 0000000..305ef1b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStat;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.TaskScheduler;
+import org.apache.tajo.master.TaskSchedulerImpl;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.StorageManager;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+
+/**
+ * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine.
+ */
+public class SubQuery implements EventHandler<SubQueryEvent> {
+
+ private static final Log LOG = LogFactory.getLog(SubQuery.class);
+
+ private ExecutionBlock block;
+ private int priority;
+ private TableMeta meta;
+ private EventHandler eventHandler;
+ private final StorageManager sm;
+ private TaskSchedulerImpl taskScheduler;
+ private QueryContext context;
+
+ private long startTime;
+ private long finishTime;
+
+ volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
+ volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+
+
+ private static ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
+ private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent>
+ stateMachine;
+
+ private StateMachineFactory<SubQuery, SubQueryState,
+ SubQueryEventType, SubQueryEvent> stateMachineFactory =
+ new StateMachineFactory <SubQuery, SubQueryState,
+ SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
+
+ .addTransition(SubQueryState.NEW,
+ EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
+ SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
+
+ .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
+
+ .addTransition(SubQueryState.CONTAINER_ALLOCATED,
+ EnumSet.of(SubQueryState.RUNNING, SubQueryState.FAILED,
+ SubQueryState.SUCCEEDED), SubQueryEventType.SQ_START, new StartTransition())
+ .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
+
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, SubQueryEventType.SQ_START)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_TASK_COMPLETED, new TaskCompletedTransition())
+ .addTransition(SubQueryState.RUNNING, SubQueryState.SUCCEEDED,
+ SubQueryEventType.SQ_SUBQUERY_COMPLETED, new SubQueryCompleteTransition())
+ .addTransition(SubQueryState.RUNNING, SubQueryState.FAILED,
+ SubQueryEventType.SQ_FAILED, new InternalErrorTransition())
+
+ .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+ SubQueryEventType.SQ_START)
+ .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED)
+
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ SubQueryEventType.SQ_START)
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED)
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ SubQueryEventType.SQ_FAILED)
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ SubQueryEventType.SQ_INTERNAL_ERROR);
+
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ private int completedTaskCount = 0;
+
+ public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) {
+ this.context = context;
+ this.block = block;
+ this.sm = sm;
+ this.eventHandler = context.getEventHandler();
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ public QueryContext getContext() {
+ return context;
+ }
+
+ public EventHandler getEventHandler() {
+ return eventHandler;
+ }
+
+ public TaskScheduler getTaskScheduler() {
+ return taskScheduler;
+ }
+
+ public void setStartTime() {
+ startTime = context.getClock().getTime();
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public void setFinishTime() {
+ finishTime = context.getClock().getTime();
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public float getProgress() {
+ readLock.lock();
+ try {
+ if (getState() == SubQueryState.NEW) {
+ return 0;
+ } else {
+ if (completedTaskCount == 0) {
+ return 0.0f;
+ } else {
+ return (float)completedTaskCount / (float)tasks.size();
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public ExecutionBlock getBlock() {
+ return block;
+ }
+
+ public void addTask(QueryUnit task) {
+ tasks.put(task.getId(), task);
+ }
+
+ public void abortSubQuery(SubQueryState finalState) {
+ // TODO -
+ // - committer.abortSubQuery(...)
+ // - record SubQuery Finish Time
+ // - CleanUp Tasks
+ // - Record History
+
+ eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
+ }
+
+ public StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> getStateMachine() {
+ return this.stateMachine;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+
+ public int getPriority() {
+ return this.priority;
+ }
+
+ public StorageManager getStorageManager() {
+ return sm;
+ }
+
+ public SubQuery getChildQuery(ScanNode scanForChild) {
+ return context.getSubQuery(block.getChildBlock(scanForChild).getId());
+ }
+
+ public SubQueryId getId() {
+ return block.getId();
+ }
+
+ public QueryUnit[] getQueryUnits() {
+ return tasks.values().toArray(new QueryUnit[tasks.size()]);
+ }
+
+ public QueryUnit getQueryUnit(QueryUnitId qid) {
+ return tasks.get(qid);
+ }
+
+ public void setTableMeta(TableMeta meta) {
+ this.meta = meta;
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public TableMeta getTableMeta() {
+ return meta;
+ }
+
+ public TableStat getTableStat() {
+ return this.meta.getStat();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getId());
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof SubQuery) {
+ SubQuery other = (SubQuery)o;
+ return getId().equals(other.getId());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ public int compareTo(SubQuery other) {
+ return getId().compareTo(other.getId());
+ }
+
+ public SubQueryState getState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private static TableStat computeStatFromUnionBlock(SubQuery unit) {
+ TableStat stat = new TableStat();
+ TableStat childStat;
+ long avgRows = 0, numBytes = 0, numRows = 0;
+ int numBlocks = 0, numPartitions = 0;
+ List<ColumnStat> columnStats = Lists.newArrayList();
+
+ Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
+ while (it.hasNext()) {
+ ExecutionBlock block = it.next();
+ SubQuery childSubQuery = unit.context.getSubQuery(block.getId());
+ childStat = childSubQuery.getTableStat();
+ avgRows += childStat.getAvgRows();
+ columnStats.addAll(childStat.getColumnStats());
+ numBlocks += childStat.getNumBlocks();
+ numBytes += childStat.getNumBytes();
+ numPartitions += childStat.getNumPartitions();
+ numRows += childStat.getNumRows();
+ }
+
+ stat.setColumnStats(columnStats);
+ stat.setNumBlocks(numBlocks);
+ stat.setNumBytes(numBytes);
+ stat.setNumPartitions(numPartitions);
+ stat.setNumRows(numRows);
+ stat.setAvgRows(avgRows);
+ return stat;
+ }
+
+ public TableMeta buildTableMeta() throws IOException {
+ finishTime = context.getClock().getTime();
+
+ TableStat stat;
+ if (block.hasUnion()) {
+ stat = computeStatFromUnionBlock(this);
+ } else {
+ stat = computeStatFromTasks();
+ }
+ TableMeta meta = writeStat(this, stat);
+ meta.setStat(stat);
+ setTableMeta(meta);
+ return meta;
+ }
+
+ private TableStat computeStatFromTasks() {
+ List<TableStat> stats = Lists.newArrayList();
+ for (QueryUnit unit : getQueryUnits()) {
+ stats.add(unit.getStats());
+ }
+ TableStat tableStat = StatisticsUtil.aggregateTableStat(stats);
+ return tableStat;
+ }
+
+ private TableMeta writeStat(SubQuery subQuery, TableStat stat)
+ throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ StoreTableNode storeTableNode = execBlock.getStoreTableNode();
+ TableMeta meta = toTableMeta(storeTableNode);
+ meta.setStat(stat);
+ sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
+ return meta;
+ }
+
+ private static TableMeta toTableMeta(StoreTableNode store) {
+ if (store.hasOptions()) {
+ return CatalogUtil.newTableMeta(store.getOutSchema(),
+ store.getStorageType(), store.getOptions());
+ } else {
+ return CatalogUtil.newTableMeta(store.getOutSchema(),
+ store.getStorageType());
+ }
+ }
+
+ private void stopScheduler() {
+ // If there are launched TaskRunners, send the 'shouldDie' message to all r
+ // via received task requests.
+ if (taskScheduler != null) {
+ taskScheduler.stop();
+ }
+ }
+
+ private void releaseContainers() {
+ // If there are still live TaskRunners, try to kill the containers.
+ eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(),
+ containers.values()));
+ }
+
+ private void finish() {
+ TableMeta meta = null;
+ try {
+ meta = buildTableMeta();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ setTableMeta(meta);
+ setFinishTime();
+ eventHandler.handle(new SubQuerySucceeEvent(getId(), meta));
+ }
+
+ @Override
+ public void handle(SubQueryEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType());
+ }
+
+ try {
+ writeLock.lock();
+ SubQueryState oldState = getState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state", e);
+ eventHandler.handle(new SubQueryEvent(getId(),
+ SubQueryEventType.SQ_INTERNAL_ERROR));
+ }
+
+ // notify the eventhandler of state change
+ if (LOG.isDebugEnabled()) {
+ if (oldState != getState()) {
+ LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
+ + getState());
+ }
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
+ SubQueryEvent, SubQueryState> {
+
+ @Override
+ public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+ subQuery.setStartTime();
+ ExecutionBlock execBlock = subQuery.getBlock();
+ SubQueryState state;
+
+ try {
+ // Union operator does not require actual query processing. It is performed logically.
+ if (execBlock.hasUnion()) {
+ subQuery.finish();
+ state = SubQueryState.SUCCEEDED;
+ } else {
+ setRepartitionIfNecessary(subQuery);
+ createTasks(subQuery);
+
+ if (subQuery.tasks.size() == 0) { // if there is no tasks
+ subQuery.finish();
+ return SubQueryState.SUCCEEDED;
+ } else {
+ initTaskScheduler(subQuery);
+ allocateContainers(subQuery);
+ return SubQueryState.INIT;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
+ subQuery.eventHandler.handle(
+ new QueryDiagnosticsUpdateEvent(subQuery.getId().getQueryId(), e.getMessage()));
+ subQuery.eventHandler.handle(
+ new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.FAILED));
+ return SubQueryState.FAILED;
+ }
+
+ return state;
+ }
+
+ private void initTaskScheduler(SubQuery subQuery) {
+ subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.context);
+ subQuery.taskScheduler.init(subQuery.context.getConf());
+ subQuery.taskScheduler.start();
+ }
+
+ /**
+ * If a parent block requires a repartition operation, the method sets proper repartition
+ * methods and the number of partitions to a given subquery.
+ */
+ private static void setRepartitionIfNecessary(SubQuery subQuery) {
+ if (subQuery.getBlock().hasParentBlock()) {
+ int numTasks = calculatePartitionNum(subQuery);
+ Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
+ }
+ }
+
+ /**
+ * Getting the desire number of partitions according to the volume of input data.
+ * This method is only used to determine the partition key number of hash join or aggregation.
+ *
+ * @param subQuery
+ * @return
+ */
+ public static int calculatePartitionNum(SubQuery subQuery) {
+ TajoConf conf = subQuery.context.getConf();
+ ExecutionBlock parent = subQuery.getBlock().getParentBlock();
+
+ GroupbyNode grpNode = null;
+ if (parent != null) {
+ grpNode = (GroupbyNode) PlannerUtil.findTopNode(
+ parent.getPlan(), ExprType.GROUP_BY);
+ }
+
+ // Is this subquery the first step of join?
+ if (parent != null && parent.getScanNodes().length == 2) {
+ Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
+
+ // for inner
+ ExecutionBlock outer = child.next();
+ long outerVolume = getInputVolume(subQuery.context, outer);
+
+ // for inner
+ ExecutionBlock inner = child.next();
+ long innerVolume = getInputVolume(subQuery.context, inner);
+ LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
+ LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
+
+ long smaller = Math.min(outerVolume, innerVolume);
+
+ int mb = (int) Math.ceil((double)smaller / 1048576);
+ LOG.info("Smaller Table's volume is approximately " + mb + " MB");
+ // determine the number of task
+ int taskNum = (int) Math.ceil((double)mb /
+ conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME));
+ LOG.info("The determined number of join partitions is " + taskNum);
+ return taskNum;
+
+ // Is this subquery the first step of group-by?
+ } else if (grpNode != null) {
+
+ if (grpNode.getGroupingColumns().length == 0) {
+ return 1;
+ } else {
+ long volume = getInputVolume(subQuery.context, subQuery.block);
+
+ int mb = (int) Math.ceil((double)volume / 1048576);
+ LOG.info("Table's volume is approximately " + mb + " MB");
+ // determine the number of task
+ int taskNum = (int) Math.ceil((double)mb /
+ conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME));
+ LOG.info("The determined number of aggregation partitions is " + taskNum);
+ return taskNum;
+ }
+ } else {
+ LOG.info("============>>>>> Unexpected Case! <<<<<================");
+ long volume = getInputVolume(subQuery.context, subQuery.block);
+
+ int mb = (int) Math.ceil((double)volume / 1048576);
+ LOG.info("Table's volume is approximately " + mb + " MB");
+ // determine the number of task per 128MB
+ int taskNum = (int) Math.ceil((double)mb / 128);
+ LOG.info("The determined number of partitions is " + taskNum);
+ return taskNum;
+ }
+ }
+
+ private static void createTasks(SubQuery subQuery) throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ QueryUnit [] tasks;
+ if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+ tasks = createLeafTasks(subQuery);
+
+ } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+ tasks = Repartitioner.createJoinTasks(subQuery);
+
+ } else { // Case 3: Others (Sort or Aggregation)
+ int numTasks = getNonLeafTaskNum(subQuery);
+ SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId();
+ SubQuery child = subQuery.context.getSubQuery(childId);
+ tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
+ }
+
+ LOG.info("Create " + tasks.length + " Tasks");
+
+ for (QueryUnit task : tasks) {
+ subQuery.addTask(task);
+ }
+ }
+
+ /**
+ * Getting the desire number of tasks according to the volume of input data
+ *
+ * @param subQuery
+ * @return
+ */
+ public static int getNonLeafTaskNum(SubQuery subQuery) {
+ // Getting intermediate data size
+ long volume = getInputVolume(subQuery.context, subQuery.getBlock());
+
+ int mb = (int) Math.ceil((double)volume / 1048576);
+ LOG.info("Table's volume is approximately " + mb + " MB");
+ // determine the number of task per 64MB
+ int maxTaskNum = (int) Math.ceil((double)mb / 64);
+ LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
+ return maxTaskNum;
+ }
+
+ public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) {
+ Map<String, TableDesc> tableMap = context.getTableDescMap();
+ if (execBlock.isLeafBlock()) {
+ ScanNode outerScan = execBlock.getScanNodes()[0];
+ TableStat stat = tableMap.get(outerScan.getFromTable().getTableName()).getMeta().getStat();
+ return stat.getNumBytes();
+ } else {
+ long aggregatedVolume = 0;
+ for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
+ SubQuery subquery = context.getSubQuery(childBlock.getId());
+ aggregatedVolume += subquery.getTableStat().getNumBytes();
+ }
+
+ return aggregatedVolume;
+ }
+ }
+
+ public static void allocateContainers(SubQuery subQuery) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ QueryUnit [] tasks = subQuery.getQueryUnits();
+
+ int numClusterNodes = subQuery.getContext().getNumClusterNode();
+ int numRequest = Math.min(tasks.length, numClusterNodes * 4);
+
+ final Resource resource = Records.newRecord(Resource.class);
+ // TODO - for each different subquery, the volume of resource should be different.
+ resource.setMemory(2000);
+
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(subQuery.getPriority());
+ ContainerAllocationEvent event =
+ new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
+ subQuery.getId(), priority, resource, numRequest,
+ execBlock.isLeafBlock(), 0.0f);
+ subQuery.eventHandler.handle(event);
+ }
+
+ private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ ScanNode[] scans = execBlock.getScanNodes();
+ Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+ TableMeta meta;
+ Path inputPath;
+
+ ScanNode scan = scans[0];
+ TableDesc desc = subQuery.context.getTableDescMap().get(scan.getFromTable().getTableName());
+ inputPath = desc.getPath();
+ meta = desc.getMeta();
+
+ // TODO - should be change the inner directory
+ Path oldPath = new Path(inputPath, "data");
+ FileSystem fs = inputPath.getFileSystem(subQuery.context.getConf());
+ if (fs.exists(oldPath)) {
+ inputPath = oldPath;
+ }
+ List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
+
+ QueryUnit queryUnit;
+ List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
+
+ int i = 0;
+ for (Fragment fragment : fragments) {
+ queryUnit = newQueryUnit(subQuery, i++, fragment);
+ queryUnits.add(queryUnit);
+ }
+
+ return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
+ }
+
+ private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
+ ExecutionBlock execBlock = subQuery.getBlock();
+ QueryUnit unit = new QueryUnit(
+ QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+ subQuery.eventHandler);
+ unit.setLogicalPlan(execBlock.getPlan());
+ unit.setFragment2(fragment);
+ return unit;
+ }
+ }
+
+ int i = 0;
+ private static class ContainerLaunchTransition
+ implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+ @Override
+ public void transition(SubQuery subQuery, SubQueryEvent event) {
+ SubQueryContainerAllocationEvent allocationEvent =
+ (SubQueryContainerAllocationEvent) event;
+ for (Container container : allocationEvent.getAllocatedContainer()) {
+ ContainerId cId = container.getId();
+ if (subQuery.containers.containsKey(cId)) {
+ LOG.info(">>>>>>>>>>>> Duplicate Container! <<<<<<<<<<<");
+ }
+ subQuery.containers.put(cId, container);
+ // TODO - This is debugging message. Should be removed
+ subQuery.i++;
+ }
+ LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
+ subQuery.eventHandler.handle(
+ new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
+ subQuery.getId(), allocationEvent.getAllocatedContainer()));
+
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
+ SubQueryEventType.SQ_START));
+ }
+ }
+
+ private static class StartTransition implements
+ MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+
+ @Override
+ public SubQueryState transition(SubQuery subQuery,
+ SubQueryEvent subQueryEvent) {
+ // schedule tasks
+ try {
+ for (QueryUnitId taskId : subQuery.tasks.keySet()) {
+ subQuery.eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
+ }
+
+ return SubQueryState.RUNNING;
+ } catch (Exception e) {
+ LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
+ return SubQueryState.FAILED;
+ }
+ }
+ }
+
+ private static class TaskCompletedTransition
+ implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+ @Override
+ public void transition(SubQuery subQuery,
+ SubQueryEvent event) {
+ subQuery.completedTaskCount++;
+ SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
+ QueryUnitAttempt task = subQuery.getQueryUnit(taskEvent.getTaskId()).getSuccessfulAttempt();
+
+ LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/"
+ + subQuery.tasks.size() + " on " + task.getHost());
+ if (subQuery.completedTaskCount == subQuery.tasks.size()) {
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
+ SubQueryEventType.SQ_SUBQUERY_COMPLETED));
+ }
+ }
+ }
+
+ private static class SubQueryCompleteTransition
+ implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+ @Override
+ public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+ // TODO - Commit subQuery & do cleanup
+ // TODO - records succeeded, failed, killed completed task
+ // TODO - records metrics
+ subQuery.stopScheduler();
+ subQuery.releaseContainers();
+ subQuery.finish();
+ }
+ }
+
+ private static class InternalErrorTransition
+ implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+ @Override
+ public void transition(SubQuery subQuery,
+ SubQueryEvent subQueryEvent) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
new file mode 100644
index 0000000..c8256ec
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+public enum SubQueryState {
+ NEW,
+ CONTAINER_ALLOCATED,
+ INIT,
+ RUNNING,
+ SUCCEEDED,
+ FAILED
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
index c615532..c01cabc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
@@ -25,17 +25,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.client.AMRMClientImpl;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.tajo.SubQueryId;
-import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.master.QueryMaster.QueryContext;
-import org.apache.tajo.master.SubQueryState;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.master.querymaster.SubQueryState;
import java.util.HashMap;
import java.util.List;
@@ -115,9 +116,10 @@ public class RMContainerAllocator extends AMRMClientImpl
if (!stopped.get()) {
LOG.warn("Allocated thread interrupted. Returning.");
}
- return;
+ break;
}
}
+ LOG.info("Allocated thread stopped");
}
});
allocatorThread.setName("RMContainerAllocator");
@@ -126,24 +128,9 @@ public class RMContainerAllocator extends AMRMClientImpl
public void stop() {
stopped.set(true);
+ allocatorThread.interrupt();
+ LOG.info("RMContainerAllocator stopped");
super.stop();
- FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
- QueryState state = context.getQuery().getState();
- if (state == QueryState.QUERY_SUCCEEDED) {
- finishState = FinalApplicationStatus.SUCCEEDED;
- } else if (state == QueryState.QUERY_KILLED
- || (state == QueryState.QUERY_RUNNING)) {
- finishState = FinalApplicationStatus.KILLED;
- } else if (state == QueryState.QUERY_FAILED
- || state == QueryState.QUERY_ERROR) {
- finishState = FinalApplicationStatus.FAILED;
- }
-
- try {
- unregisterApplicationMaster(finishState, "", "http://localhost:1234");
- } catch (YarnRemoteException e) {
- LOG.error(e);
- }
}
private final Map<Priority, SubQueryId> subQueryMap =
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 9e281c6..a56841b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -37,7 +37,7 @@ import org.apache.tajo.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.engine.MasterWorkerProtos.*;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
import org.apache.tajo.engine.exception.UnfinishedTaskException;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.PlannerUtil;
@@ -45,7 +45,7 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.engine.planner.logical.StoreTableNode;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
import org.apache.tajo.master.ExecutionBlock.PartitionType;
import org.apache.tajo.rpc.NullCallback;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index ca8ef43..a41b280 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -37,11 +37,9 @@ import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.SubQueryId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.ipc.MasterWorkerProtocol;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.*;
import org.apache.tajo.rpc.CallFuture2;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.ProtoAsyncRpcClient;
@@ -53,7 +51,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.*;
-import static org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+import static org.apache.tajo.ipc.QueryMasterProtocol.*;
/**
* The driver class for Tajo QueryUnit processing.
@@ -72,7 +70,7 @@ public class TaskRunner extends AbstractService {
private final ContainerId containerId;
// Cluster Management
- private MasterWorkerProtocolService.Interface master;
+ private QueryMasterProtocol.QueryMasterProtocolService.Interface master;
// for temporal or intermediate files
private FileSystem localFS;
@@ -186,7 +184,7 @@ public class TaskRunner extends AbstractService {
return nodeId.toString();
}
- public MasterWorkerProtocolService.Interface getMaster() {
+ public QueryMasterProtocolService.Interface getMaster() {
return master;
}
@@ -223,7 +221,7 @@ public class TaskRunner extends AbstractService {
}
}
- static void fatalError(MasterWorkerProtocolService.Interface proxy,
+ static void fatalError(QueryMasterProtocolService.Interface proxy,
QueryUnitAttemptId taskAttemptId, String message) {
TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
.setId(taskAttemptId.getProto())
@@ -338,11 +336,11 @@ public class TaskRunner extends AbstractService {
/**
* TaskRunner takes 5 arguments as follows:
* <ol>
- * <li>1st: TaskRunnerListener hostname</li>
- * <li>2nd: TaskRunnerListener port</li>
- * <li>3nd: SubQueryId</li>
- * <li>4th: NodeId</li>
- * <li>5th: ContainerId</li>
+ * <li>1st: SubQueryId</li>
+ * <li>2nd: NodeId</li>
+ * <li>3nd: ContainerId</li>
+ * <li>4th: QueryMaster hostname</li>
+ * <li>5th: QueryMaster port</li>
* </ol>
*/
public static void main(String[] args) throws Exception {
@@ -356,17 +354,17 @@ public class TaskRunner extends AbstractService {
UserGroupInformation.setConfiguration(conf);
- // TaskRunnerListener's address
- String host = args[0];
- int port = Integer.parseInt(args[1]);
- final InetSocketAddress masterAddr =
- NetUtils.createSocketAddrForHost(host, port);
-
// SubQueryId from String
- final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]);
+ final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[0]);
// NodeId has a form of hostname:port.
- NodeId nodeId = ConverterUtils.toNodeId(args[3]);
- ContainerId containerId = ConverterUtils.toContainerId(args[4]);
+ NodeId nodeId = ConverterUtils.toNodeId(args[1]);
+ ContainerId containerId = ConverterUtils.toContainerId(args[2]);
+
+ // QueryMaster's address
+ String host = args[3];
+ int port = Integer.parseInt(args[4]);
+ final InetSocketAddress masterAddr =
+ NetUtils.createSocketAddrForHost(host, port);
// TODO - 'load credential' should be implemented
// Getting taskOwner
@@ -374,26 +372,29 @@ public class TaskRunner extends AbstractService {
UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
//taskOwner.addToken(token);
- // TaskRunnerListener RPC
+ // QueryMasterService RPC
ProtoAsyncRpcClient client;
- MasterWorkerProtocolService.Interface master;
+ QueryMasterProtocolService.Interface master;
// initialize MasterWorkerProtocol as an actual task owner.
client =
taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
@Override
public ProtoAsyncRpcClient run() throws Exception {
- return new ProtoAsyncRpcClient(MasterWorkerProtocol.class, masterAddr);
+ return new ProtoAsyncRpcClient(QueryMasterProtocol.class, masterAddr);
}
});
master = client.getStub();
TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
- taskRunner.init(conf);
- taskRunner.start();
- client.close();
- LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
- System.exit(0);
+ try {
+ taskRunner.init(conf);
+ taskRunner.start();
+ } finally {
+ client.close();
+ LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
+ System.exit(0);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
index c171c2b..6164553 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
@@ -25,9 +25,9 @@ option java_generate_equals_and_hash = true;
import "DataTypes.proto";
enum StoreType {
- MEM = 0;
- CSV = 1;
- RAW = 2;
+ MEM = 0;
+ CSV = 1;
+ RAW = 2;
RCFILE = 3;
ROWFILE = 4;
HCFILE = 5;
@@ -35,147 +35,147 @@ enum StoreType {
}
enum OrderType {
- ORDER_NONE = 0;
- ASC = 1;
- DSC = 2;
+ ORDER_NONE = 0;
+ ASC = 1;
+ DSC = 2;
}
enum CompressType {
- COMP_NONE = 0;
- NULL_SUPPRESS = 1;
- RUN_LENGTH = 2;
- BIT_VECTOR = 3;
- DICTIONARY = 4;
- SNAPPY = 5;
- LZ = 6;
+ COMP_NONE = 0;
+ NULL_SUPPRESS = 1;
+ RUN_LENGTH = 2;
+ BIT_VECTOR = 3;
+ DICTIONARY = 4;
+ SNAPPY = 5;
+ LZ = 6;
}
message ColumnMetaProto {
- required DataType dataType = 1;
- required bool compressed = 2;
- required bool sorted = 3;
- required bool contiguous = 4;
- required StoreType storeType = 5;
- required CompressType compType = 6;
- required int64 startRid = 7;
- required int32 recordNum = 8;
- required int32 offsetToIndex = 9;
+ required DataType dataType = 1;
+ required bool compressed = 2;
+ required bool sorted = 3;
+ required bool contiguous = 4;
+ required StoreType storeType = 5;
+ required CompressType compType = 6;
+ required int64 startRid = 7;
+ required int32 recordNum = 8;
+ required int32 offsetToIndex = 9;
}
message ColumnProto {
- required string columnName = 1;
- required DataType dataType = 2;
+ required string columnName = 1;
+ required DataType dataType = 2;
}
message SchemaProto {
- repeated ColumnProto fields = 1;
+ repeated ColumnProto fields = 1;
}
message KeyValueProto {
- required string key = 1;
- required string value = 2;
+ required string key = 1;
+ required string value = 2;
}
message KeyValueSetProto {
- repeated KeyValueProto keyval = 1;
+ repeated KeyValueProto keyval = 1;
}
message FragmentProto {
- required string id = 1;
- required string path = 2;
- required int64 startOffset = 3;
- required int64 length = 4;
- required TableProto meta = 5;
- optional TableStatProto stat = 6;
+ required string id = 1;
+ required string path = 2;
+ required int64 startOffset = 3;
+ required int64 length = 4;
+ required TableProto meta = 5;
+ optional TableStatProto stat = 6;
optional bool distCached = 7 [default = false];
}
message TableProto {
- required SchemaProto schema = 1;
- required StoreType storeType = 2;
- required KeyValueSetProto params = 3;
- optional TableStatProto stat = 4;
+ required SchemaProto schema = 1;
+ required StoreType storeType = 2;
+ required KeyValueSetProto params = 3;
+ optional TableStatProto stat = 4;
}
message TableDescProto {
- required string id = 1;
- required string path = 2;
- required TableProto meta = 3;
+ required string id = 1;
+ required string path = 2;
+ required TableProto meta = 3;
}
enum FunctionType {
- GENERAL = 0;
- AGGREGATION = 1;
+ GENERAL = 0;
+ AGGREGATION = 1;
}
message FunctionDescProto {
- required string signature = 1;
- required string className = 2;
- required FunctionType type = 3;
- repeated DataType parameterTypes = 4;
- required DataType returnType = 5;
+ required string signature = 1;
+ required string className = 2;
+ required FunctionType type = 3;
+ repeated DataType parameterTypes = 4;
+ required DataType returnType = 5;
}
message IndexDescProto {
- required string name = 1;
- required string tableId = 2;
- required ColumnProto column = 3;
- required IndexMethod indexMethod = 4;
- optional bool isUnique = 5 [default = false];
- optional bool isClustered = 6 [default = false];
- optional bool isAscending = 7 [default = false];
+ required string name = 1;
+ required string tableId = 2;
+ required ColumnProto column = 3;
+ required IndexMethod indexMethod = 4;
+ optional bool isUnique = 5 [default = false];
+ optional bool isClustered = 6 [default = false];
+ optional bool isAscending = 7 [default = false];
}
enum IndexMethod {
- TWO_LEVEL_BIN_TREE = 0;
- BTREE = 1;
- HASH = 2;
- BITMAP = 3;
+ TWO_LEVEL_BIN_TREE = 0;
+ BTREE = 1;
+ HASH = 2;
+ BITMAP = 3;
}
message GetAllTableNamesResponse {
- repeated string tableName = 1;
+ repeated string tableName = 1;
}
message GetIndexRequest {
- required string tableName = 1;
- required string columnName = 2;
+ required string tableName = 1;
+ required string columnName = 2;
}
message GetFunctionsResponse {
- repeated FunctionDescProto functionDesc = 1;
+ repeated FunctionDescProto functionDesc = 1;
}
message UnregisterFunctionRequest {
- required string signature = 1;
- repeated DataType parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message GetFunctionMetaRequest {
- required string signature = 1;
- repeated DataType parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message ContainFunctionRequest {
- required string signature = 1;
- repeated DataType parameterTypes = 2;
+ required string signature = 1;
+ repeated DataType parameterTypes = 2;
}
message TableStatProto {
- required int64 numRows = 1;
- required int64 numBytes = 2;
- optional int32 numBlocks = 3;
- optional int32 numPartitions = 4;
- optional int64 avgRows = 5;
- repeated ColumnStatProto colStat = 6;
+ required int64 numRows = 1;
+ required int64 numBytes = 2;
+ optional int32 numBlocks = 3;
+ optional int32 numPartitions = 4;
+ optional int64 avgRows = 5;
+ repeated ColumnStatProto colStat = 6;
}
message ColumnStatProto {
- required ColumnProto column = 1;
- optional int64 numDistVal = 2;
- optional int64 numNulls = 3;
- optional bytes minValue = 4;
- optional bytes maxValue = 5;
+ required ColumnProto column = 1;
+ optional int64 numDistVal = 2;
+ optional int64 numNulls = 3;
+ optional bytes minValue = 4;
+ optional bytes maxValue = 5;
}
enum StatType {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
index cbcccd3..61c14c4 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
@@ -49,7 +49,7 @@ message UpdateQueryResponse {
optional string errorMessage = 2;
}
-message SubmitQueryRespose {
+message SubmitQueryResponse {
required ResultCode resultCode = 1;
optional ApplicationAttemptIdProto queryId = 2;
optional string errorMessage = 3;
@@ -94,6 +94,8 @@ message GetQueryStatusResponse {
optional int64 finishTime = 7;
optional bool hasResult = 8;
optional string errorMessage = 9;
+ optional string queryMasterHost = 10;
+ optional int32 queryMasterPort = 11;
}
message GetClusterInfoRequest {
@@ -135,7 +137,7 @@ message TableResponse {
service ClientProtocolService {
rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
- rpc submitQuery(QueryRequest) returns (SubmitQueryRespose);
+ rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
@@ -151,7 +153,6 @@ service ClientProtocolService {
rpc detachTable(StringProto) returns (BoolProto);
-
// TODO - to be implemented
//
// authenticate