You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:17:33 UTC

[16/16] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Conflicts:
	tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e04c65fd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e04c65fd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e04c65fd

Branch: refs/heads/index_support
Commit: e04c65fdde7587322e2b6c47b7b9ffe951d9e8bc
Parents: 071c5d0 1c29c1c
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Jan 9 01:17:02 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Jan 9 01:17:02 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |    4 +
 .../tajo/engine/function/builtin/AvgDouble.java |   25 +-
 .../tajo/engine/function/builtin/AvgFloat.java  |   12 +-
 .../tajo/engine/function/builtin/AvgInt.java    |   14 +-
 .../tajo/engine/function/builtin/AvgLong.java   |   27 +-
 .../tajo/engine/function/builtin/Max.java       |   76 +
 .../tajo/engine/function/builtin/MaxDouble.java |   43 +-
 .../tajo/engine/function/builtin/MaxFloat.java  |   43 +-
 .../tajo/engine/function/builtin/MaxInt.java    |   44 +-
 .../tajo/engine/function/builtin/MaxLong.java   |   32 +-
 .../tajo/engine/function/builtin/MaxString.java |   52 +-
 .../tajo/engine/function/builtin/Min.java       |   76 +
 .../tajo/engine/function/builtin/MinDouble.java |   41 +-
 .../tajo/engine/function/builtin/MinFloat.java  |   42 +-
 .../tajo/engine/function/builtin/MinInt.java    |   43 +-
 .../tajo/engine/function/builtin/MinLong.java   |   32 +-
 .../tajo/engine/function/builtin/MinString.java |   46 +-
 .../tajo/engine/function/builtin/SumDouble.java |   34 +-
 .../tajo/engine/function/builtin/SumFloat.java  |   33 +-
 .../tajo/engine/function/builtin/SumInt.java    |   32 +-
 .../tajo/engine/function/builtin/SumLong.java   |   34 +-
 .../DistinctGroupbySortAggregationExec.java     |   41 +-
 .../apache/tajo/engine/query/QueryContext.java  |    2 +-
 .../main/java/org/apache/tajo/ha/HAService.java |   56 +
 .../org/apache/tajo/ha/HAServiceHDFSImpl.java   |  316 +++++
 .../java/org/apache/tajo/ha/TajoMasterInfo.java |   89 ++
 .../tajo/master/AbstractTaskScheduler.java      |   56 -
 .../org/apache/tajo/master/ContainerProxy.java  |    2 +-
 .../tajo/master/DefaultTaskScheduler.java       |  928 ------------
 .../apache/tajo/master/FetchScheduleEvent.java  |   40 -
 .../org/apache/tajo/master/FragmentPair.java    |   73 -
 .../org/apache/tajo/master/GlobalEngine.java    |    2 +-
 .../NonForwardQueryResultFileScanner.java       |  164 ---
 .../master/NonForwardQueryResultScanner.java    |   46 -
 .../NonForwardQueryResultSystemScanner.java     |  600 --------
 .../java/org/apache/tajo/master/QueryInfo.java  |  235 +++
 .../org/apache/tajo/master/QueryJobManager.java |  311 ++++
 .../apache/tajo/master/ScheduledFetches.java    |   49 -
 .../apache/tajo/master/TajoContainerProxy.java  |    2 +-
 .../java/org/apache/tajo/master/TajoMaster.java |   11 +-
 .../tajo/master/TajoMasterClientService.java    |   13 +-
 .../apache/tajo/master/TajoMasterService.java   |    2 -
 .../tajo/master/TaskSchedulerContext.java       |   65 -
 .../tajo/master/TaskSchedulerFactory.java       |   69 -
 .../tajo/master/event/QueryCompletedEvent.java  |    2 +-
 .../tajo/master/event/QueryStartEvent.java      |    2 +-
 .../tajo/master/event/StageCompletedEvent.java  |    2 +-
 .../event/TaskAttemptToSchedulerEvent.java      |    2 +-
 .../apache/tajo/master/exec/DDLExecutor.java    |    1 -
 .../exec/NonForwardQueryResultFileScanner.java  |  164 +++
 .../exec/NonForwardQueryResultScanner.java      |   46 +
 .../NonForwardQueryResultSystemScanner.java     |  600 ++++++++
 .../apache/tajo/master/exec/QueryExecutor.java  |    9 +-
 .../org/apache/tajo/master/ha/HAService.java    |   56 -
 .../tajo/master/ha/HAServiceHDFSImpl.java       |  318 -----
 .../apache/tajo/master/ha/TajoMasterInfo.java   |   89 --
 .../master/metrics/CatalogMetricsGaugeSet.java  |   56 -
 .../metrics/WorkerResourceMetricsGaugeSet.java  |   74 -
 .../apache/tajo/master/querymaster/Query.java   |  783 ----------
 .../master/querymaster/QueryInProgress.java     |  300 ----
 .../tajo/master/querymaster/QueryInfo.java      |  235 ---
 .../tajo/master/querymaster/QueryJobEvent.java  |   45 -
 .../master/querymaster/QueryJobManager.java     |  310 ----
 .../tajo/master/querymaster/QueryMaster.java    |  631 --------
 .../querymaster/QueryMasterManagerService.java  |  263 ----
 .../master/querymaster/QueryMasterRunner.java   |  149 --
 .../master/querymaster/QueryMasterTask.java     |  647 ---------
 .../tajo/master/querymaster/Repartitioner.java  | 1251 ----------------
 .../apache/tajo/master/querymaster/Stage.java   | 1342 ------------------
 .../tajo/master/querymaster/StageState.java     |   30 -
 .../apache/tajo/master/querymaster/Task.java    |  907 ------------
 .../tajo/master/querymaster/TaskAttempt.java    |  443 ------
 .../master/rm/TajoWorkerResourceManager.java    |    3 +-
 .../tajo/master/rm/WorkerResourceManager.java   |    2 +-
 .../master/scheduler/QuerySchedulingInfo.java   |   55 +
 .../apache/tajo/master/scheduler/Scheduler.java |   41 +
 .../master/scheduler/SchedulingAlgorithms.java  |   47 +
 .../master/scheduler/SimpleFifoScheduler.java   |  147 ++
 .../master/session/InvalidSessionException.java |   25 -
 .../session/NoSuchSessionVariableException.java |   25 -
 .../org/apache/tajo/master/session/Session.java |  196 ---
 .../tajo/master/session/SessionConstants.java   |   23 -
 .../tajo/master/session/SessionEvent.java       |   34 -
 .../tajo/master/session/SessionEventType.java   |   24 -
 .../session/SessionLivelinessMonitor.java       |   53 -
 .../tajo/master/session/SessionManager.java     |  144 --
 .../tajo/metrics/CatalogMetricsGaugeSet.java    |   56 +
 .../metrics/WorkerResourceMetricsGaugeSet.java  |   74 +
 .../tajo/querymaster/AbstractTaskScheduler.java |   56 +
 .../tajo/querymaster/DefaultTaskScheduler.java  |  926 ++++++++++++
 .../tajo/querymaster/FetchScheduleEvent.java    |   40 +
 .../java/org/apache/tajo/querymaster/Query.java |  783 ++++++++++
 .../tajo/querymaster/QueryInProgress.java       |  301 ++++
 .../apache/tajo/querymaster/QueryJobEvent.java  |   46 +
 .../apache/tajo/querymaster/QueryMaster.java    |  631 ++++++++
 .../querymaster/QueryMasterManagerService.java  |  262 ++++
 .../tajo/querymaster/QueryMasterTask.java       |  650 +++++++++
 .../apache/tajo/querymaster/Repartitioner.java  | 1250 ++++++++++++++++
 .../java/org/apache/tajo/querymaster/Stage.java | 1342 ++++++++++++++++++
 .../org/apache/tajo/querymaster/StageState.java |   30 +
 .../java/org/apache/tajo/querymaster/Task.java  |  897 ++++++++++++
 .../apache/tajo/querymaster/TaskAttempt.java    |  443 ++++++
 .../tajo/querymaster/TaskSchedulerContext.java  |   65 +
 .../tajo/querymaster/TaskSchedulerFactory.java  |   68 +
 .../tajo/scheduler/QuerySchedulingInfo.java     |   55 -
 .../org/apache/tajo/scheduler/Scheduler.java    |   41 -
 .../tajo/scheduler/SchedulingAlgorithms.java    |   47 -
 .../tajo/scheduler/SimpleFifoScheduler.java     |  147 --
 .../tajo/session/InvalidSessionException.java   |   25 +
 .../session/NoSuchSessionVariableException.java |   25 +
 .../java/org/apache/tajo/session/Session.java   |  196 +++
 .../apache/tajo/session/SessionConstants.java   |   23 +
 .../org/apache/tajo/session/SessionEvent.java   |   34 +
 .../apache/tajo/session/SessionEventType.java   |   24 +
 .../tajo/session/SessionLivelinessMonitor.java  |   53 +
 .../org/apache/tajo/session/SessionManager.java |  144 ++
 .../main/java/org/apache/tajo/util/JSPUtil.java |   10 +-
 .../apache/tajo/util/history/HistoryReader.java |    2 +-
 .../apache/tajo/util/history/HistoryWriter.java |    2 +-
 .../java/org/apache/tajo/worker/FetchImpl.java  |    4 +-
 .../tajo/worker/TajoResourceAllocator.java      |    6 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |    6 +-
 .../tajo/worker/TajoWorkerClientService.java    |    2 +-
 tajo-core/src/main/resources/tajo-default.xml   |    2 +-
 .../resources/webapps/admin/catalogview.jsp     |    2 +-
 .../main/resources/webapps/admin/cluster.jsp    |    4 +-
 .../src/main/resources/webapps/admin/index.jsp  |    6 +-
 .../src/main/resources/webapps/admin/query.jsp  |    4 +-
 .../resources/webapps/admin/query_executor.jsp  |    2 +-
 .../src/main/resources/webapps/worker/index.jsp |    4 +-
 .../resources/webapps/worker/querydetail.jsp    |    4 +-
 .../main/resources/webapps/worker/queryplan.jsp |    6 +-
 .../resources/webapps/worker/querytasks.jsp     |    2 +-
 .../src/main/resources/webapps/worker/task.jsp  |    8 +-
 .../apache/tajo/LocalTajoTestingUtility.java    |    2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |    8 +-
 .../engine/function/TestBuiltinFunctions.java   |  234 +++
 .../tajo/engine/planner/TestLogicalPlanner.java |    2 +-
 .../planner/physical/TestPhysicalPlanner.java   |    2 +-
 .../tajo/engine/query/TestGroupByQuery.java     |   12 +-
 .../tajo/engine/query/TestJoinBroadcast.java    |    2 +-
 .../tajo/engine/query/TestTablePartitions.java  |    2 +-
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |  153 ++
 .../TestNonForwardQueryResultSystemScanner.java |    4 +-
 .../apache/tajo/master/TestRepartitioner.java   |    8 +-
 .../tajo/master/ha/TestHAServiceHDFSImpl.java   |  158 ---
 .../querymaster/TestIntermediateEntry.java      |   53 -
 .../tajo/master/querymaster/TestKillQuery.java  |  125 --
 .../master/querymaster/TestQueryProgress.java   |   75 -
 .../querymaster/TestTaskStatusUpdate.java       |  194 ---
 .../master/scheduler/TestFifoScheduler.java     |  116 ++
 .../tajo/querymaster/TestIntermediateEntry.java |   53 +
 .../apache/tajo/querymaster/TestKillQuery.java  |  125 ++
 .../tajo/querymaster/TestQueryProgress.java     |   75 +
 .../tajo/querymaster/TestTaskStatusUpdate.java  |  194 +++
 .../tajo/scheduler/TestFifoScheduler.java       |  116 --
 .../java/org/apache/tajo/util/TestJSPUtil.java  |    2 +-
 .../util/history/TestHistoryWriterReader.java   |    2 +-
 .../org/apache/tajo/worker/TestHistory.java     |    2 +-
 .../testAvgLongOverflow.sql                     |    1 +
 .../testAvgLongOverflow.result                  |    3 +
 .../testGroupByWithNullData2.result             |    2 +-
 .../testGroupByWithNullData3.result             |    2 +-
 .../testGroupByWithNullData4.result             |    2 +-
 .../testGroupByWithNullData6.result             |    2 +-
 .../testGroupByWithNullData7.result             |    2 +-
 .../testGroupByWithNullData8.result             |    2 +-
 .../testLeftOuterJoinWithEmptyTable2.result     |   10 +-
 .../testLeftOuterJoinWithEmptyTable4.result     |    2 +-
 .../testLeftOuterJoinWithEmptyTable2.result     |   10 +-
 .../testLeftOuterJoinWithEmptyTable4.result     |    2 +-
 .../testLeftOuterJoinWithEmptyTable5.result     |    4 +-
 tajo-dist/pom.xml                               |    8 +-
 173 files changed, 11976 insertions(+), 12138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 0000000,f645dc5..00a5362
mode 000000,100644..100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@@ -1,0 -1,616 +1,600 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.master.exec;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Stack;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.tajo.QueryId;
+ import org.apache.tajo.TaskAttemptId;
+ import org.apache.tajo.TaskId;
+ import org.apache.tajo.catalog.CatalogUtil;
+ import org.apache.tajo.catalog.Column;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableDesc;
+ import org.apache.tajo.catalog.TableMeta;
 -import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 -import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
 -import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
++import org.apache.tajo.catalog.proto.CatalogProtos.*;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.common.TajoDataTypes.DataType;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.datum.DatumFactory;
+ import org.apache.tajo.engine.codegen.CompilationError;
+ import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+ import org.apache.tajo.engine.planner.Projector;
+ import org.apache.tajo.engine.planner.global.ExecutionBlock;
+ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+ import org.apache.tajo.engine.planner.global.GlobalPlanner;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.engine.planner.physical.PhysicalExec;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.master.TajoMaster.MasterContext;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.PlanningException;
+ import org.apache.tajo.plan.expr.EvalNode;
+ import org.apache.tajo.plan.logical.IndexScanNode;
+ import org.apache.tajo.plan.logical.LogicalNode;
+ import org.apache.tajo.plan.logical.ScanNode;
+ import org.apache.tajo.storage.RowStoreUtil;
+ import org.apache.tajo.storage.Tuple;
+ import org.apache.tajo.storage.VTuple;
+ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+ import org.apache.tajo.util.KeyValueSet;
+ import org.apache.tajo.util.TUtil;
+ import org.apache.tajo.worker.TaskAttemptContext;
+ 
+ import com.google.protobuf.ByteString;
+ 
+ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
+   
+   private final Log LOG = LogFactory.getLog(getClass());
+   
+   private MasterContext masterContext;
+   private LogicalPlan logicalPlan;
+   private final QueryId queryId;
+   private final String sessionId;
+   private TaskAttemptContext taskContext;
+   private int currentRow;
+   private long maxRow;
+   private TableDesc tableDesc;
+   private Schema outSchema;
+   private RowStoreEncoder encoder;
+   private PhysicalExec physicalExec;
+   
+   public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, 
+       String sessionId, int maxRow) {
+     masterContext = context;
+     logicalPlan = plan;
+     this.queryId = queryId;
+     this.sessionId = sessionId;
+     this.maxRow = maxRow;
+     
+   }
+   
+   @Override
+   public void init() throws IOException {
+     QueryContext queryContext = new QueryContext(masterContext.getConf());
+     currentRow = 0;
+     
+     MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
+     GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
+     try {
+       globalPlanner.build(masterPlan);
+     } catch (PlanningException e) {
+       throw new RuntimeException(e);
+     }
+     
+     ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
+     ExecutionBlock leafBlock = null;
+     while (cursor.hasNext()) {
+       ExecutionBlock block = cursor.nextBlock();
+       if (masterPlan.isLeaf(block)) {
+         leafBlock = block;
+         break;
+       }
+     }
+     
+     taskContext = new TaskAttemptContext(queryContext, null,
+         new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0),
+         null, null);
+     physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf())
+       .createPlan(taskContext, leafBlock.getPlan());
+     
+     tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(), 
+         new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null);
+     outSchema = physicalExec.getSchema();
+     encoder = RowStoreUtil.createEncoder(getLogicalSchema());
+     
+     physicalExec.init();
+   }
+ 
+   @Override
+   public void close() throws Exception {
+     tableDesc = null;
+     outSchema = null;
+     encoder = null;
+     if (physicalExec != null) {
+       try {
+         physicalExec.close();
+       } catch (Exception ignored) {}
+     }
+     physicalExec = null;
+     currentRow = -1;
+   }
+   
+   private List<Tuple> getTablespaces(Schema outSchema) {
+     List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces();
+     List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
+     
+     for (TablespaceProto tablespace: tablespaces) {
+       aTuple = new VTuple(outSchema.size());
+       
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column column = columns.get(fieldId);
+         if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+           if (tablespace.hasId()) {
+             aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId()));
+           } else {
+             aTuple.put(fieldId, DatumFactory.createNullDatum());
+           }
+         } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName()));
+         } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) {
+           if (tablespace.hasHandler()) {
+             aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler()));
+           } else {
+             aTuple.put(fieldId, DatumFactory.createNullDatum());
+           }
+         } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri()));
+         }
+       }
+       tuples.add(aTuple);
+     }
+     
+     return tuples;    
+   }
+   
+   private List<Tuple> getDatabases(Schema outSchema) {
+     List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases();
+     List<Tuple> tuples = new ArrayList<Tuple>(databases.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
+     
+     for (DatabaseProto database: databases) {
+       aTuple = new VTuple(outSchema.size());
+       
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column column = columns.get(fieldId);
+         if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(database.getId()));
+         } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(database.getName()));
+         } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+           if (database.hasSpaceId()) {
+             aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId()));
+           } else {
+             aTuple.put(fieldId, DatumFactory.createNullDatum());
+           }
+         }
+       }
+       
+       tuples.add(aTuple);
+     }
+     
+     return tuples;
+   }
+   
+   private List<Tuple> getTables(Schema outSchema) {
+     List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables();
+     List<Tuple> tuples = new ArrayList<Tuple>(tables.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
+     
+     for (TableDescriptorProto table: tables) {
+       aTuple = new VTuple(outSchema.size());
+       
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column column = columns.get(fieldId);
+         if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(table.getTid()));
+         } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId()));
+         } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(table.getName()));
+         } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) {
+           if (table.hasTableType()) {
+             aTuple.put(fieldId, DatumFactory.createText(table.getTableType()));
+           } else {
+             aTuple.put(fieldId, DatumFactory.createNullDatum());
+           }
+         } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(table.getPath()));
+         } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(table.getStoreType()));
+         }
+       }
+       
+       tuples.add(aTuple);
+     }
+     
+     return tuples;
+   }
+   
+   private List<Tuple> getColumns(Schema outSchema) {
+     List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns();
+     List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
+     int columnId = 1, prevtid = -1, tid = 0;
+     
+     for (ColumnProto column: columnsList) {
+       aTuple = new VTuple(outSchema.size());
+       
+       tid = column.getTid();
+       if (prevtid != tid) {
+         columnId = 1;
+         prevtid = tid;
+       }
+       
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column colObj = columns.get(fieldId);
+         
+         if ("tid".equalsIgnoreCase(colObj.getSimpleName())) {
+           if (column.hasTid()) {
+             aTuple.put(fieldId, DatumFactory.createInt4(tid));
+           } else {
+             aTuple.put(fieldId, DatumFactory.createNullDatum());
+           }
+         } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(column.getName()));
+         } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(columnId));
+         } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString()));
+         } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) {
+           DataType dataType = column.getDataType();
+           if (dataType.hasLength()) {
+             aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength()));
+           } else {
+             aTuple.put(fieldId, DatumFactory.createNullDatum());
+           }
+         }
+       }
+       
+       columnId++;
+       tuples.add(aTuple);
+     }
+     
+     return tuples;
+   }
 -  
++
+   private List<Tuple> getIndexes(Schema outSchema) {
 -    List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
++    List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes();
+     List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
 -    
 -    for (IndexProto index: indexList) {
++
++    for (IndexDescProto index: indexList) {
+       aTuple = new VTuple(outSchema.size());
 -      
++
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column column = columns.get(fieldId);
 -        
++
+         if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
++          aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId()));
+         } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
++          aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid()));
+         } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
 -        } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
 -        } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
 -        } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
 -        } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
 -        } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
 -        } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
 -          aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
++        } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) {
++          aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name()));
++        } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) {
++          aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath()));
+         }
+       }
 -      
++
+       tuples.add(aTuple);
+     }
 -    
++
+     return tuples;
+   }
+   
+   private List<Tuple> getAllTableOptions(Schema outSchema) {
+     List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions();
+     List<Tuple> tuples = new ArrayList<Tuple>(optionList.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
+     
+     for (TableOptionProto option: optionList) {
+       aTuple = new VTuple(outSchema.size());
+       
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column column = columns.get(fieldId);
+         
+         if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(option.getTid()));
+         } else if ("key_".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey()));
+         } else if ("value_".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue()));
+         }
+       }
+       
+       tuples.add(aTuple);
+     }
+     
+     return tuples;
+   }
+   
+   private List<Tuple> getAllTableStats(Schema outSchema) {
+     List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats();
+     List<Tuple> tuples = new ArrayList<Tuple>(statList.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
+     
+     for (TableStatsProto stat: statList) {
+       aTuple = new VTuple(outSchema.size());
+       
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column column = columns.get(fieldId);
+         
+         if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid()));
+         } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows()));
+         } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes()));
+         }
+       }
+       
+       tuples.add(aTuple);
+     }
+     
+     return tuples;
+   }
+   
+   private List<Tuple> getAllPartitions(Schema outSchema) {
+     List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions();
+     List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size());
+     List<Column> columns = outSchema.getColumns();
+     Tuple aTuple;
+     
+     for (TablePartitionProto partition: partitionList) {
+       aTuple = new VTuple(outSchema.size());
+       
+       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+         Column column = columns.get(fieldId);
+         
+         if ("pid".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid()));
+         } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid()));
+         } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) {
+           if (partition.hasPartitionName()) {
+             aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName()));
+           } else {
+             aTuple.put(fieldId, DatumFactory.createNullDatum());
+           }
+         } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition()));
+         } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+           aTuple.put(fieldId, DatumFactory.createText(partition.getPath()));
+         }
+       }
+       
+       tuples.add(aTuple);
+     }
+     
+     return tuples;
+   }
+   
+   private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
+     List<Tuple> tuples = null;
+     String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
+ 
+     if ("tablespace".equalsIgnoreCase(tableName)) {
+       tuples = getTablespaces(inSchema);
+     } else if ("databases".equalsIgnoreCase(tableName)) {
+       tuples = getDatabases(inSchema);
+     } else if ("tables".equalsIgnoreCase(tableName)) {
+       tuples = getTables(inSchema);
+     } else if ("columns".equalsIgnoreCase(tableName)) {
+       tuples = getColumns(inSchema);
+     } else if ("indexes".equalsIgnoreCase(tableName)) {
+       tuples = getIndexes(inSchema);
+     } else if ("table_options".equalsIgnoreCase(tableName)) {
+       tuples = getAllTableOptions(inSchema);
+     } else if ("table_stats".equalsIgnoreCase(tableName)) {
+       tuples = getAllTableStats(inSchema);
+     } else if ("partitions".equalsIgnoreCase(tableName)) {
+       tuples = getAllPartitions(inSchema);
+     }
+     
+     return tuples;    
+   }
+ 
+   @Override
+   public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+     List<ByteString> rows = new ArrayList<ByteString>();
+     int startRow = currentRow;
+     int endRow = startRow + fetchRowNum;
+     
+     if (physicalExec == null) {
+       return rows;
+     }
+     
+     while (currentRow < endRow) {
+       Tuple currentTuple = physicalExec.next();
+       
+       if (currentTuple == null) {
+         physicalExec.close();
+         physicalExec = null;
+         break;
+       }
+       
+       currentRow++;
+       rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple)));
+       
+       if (currentRow >= maxRow) {
+         physicalExec.close();
+         physicalExec = null;
+         break;
+       }
+     }
+     
+     return rows;
+   }
+ 
+   @Override
+   public QueryId getQueryId() {
+     return queryId;
+   }
+ 
+   @Override
+   public String getSessionId() {
+     return sessionId;
+   }
+   
+   @Override
+   public TableDesc getTableDesc() {
+     return tableDesc;
+   }
+   
+   @Override
+   public Schema getLogicalSchema() {
+     return outSchema;
+   }
+   
+   class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
+ 
+     public SimplePhysicalPlannerImpl(TajoConf conf) {
+       super(conf);
+     }
+ 
+     @Override
+     public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+         throws IOException {
+       return new SystemPhysicalExec(ctx, scanNode);
+     }
+ 
+     @Override
+     public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException {
+       return new SystemPhysicalExec(ctx, annotation);
+     }
+   }
+   
+   class SystemPhysicalExec extends PhysicalExec {
+     
+     private ScanNode scanNode;
+     private EvalNode qual;
+     private Projector projector;
+     private TableStats tableStats;
+     private final List<Tuple> cachedData;
+     private int currentRow;
+     private boolean isClosed;
+ 
+     public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) {
+       super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+       this.scanNode = scanNode;
+       this.qual = this.scanNode.getQual();
+       cachedData = TUtil.newList();
+       currentRow = 0;
+       isClosed = false;
+       
+       projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
+     }
+ 
+     @Override
+     public Tuple next() throws IOException {
+       Tuple aTuple = null;
+       Tuple outTuple = new VTuple(outColumnNum);
+       
+       if (isClosed) {
+         return null;
+       }
+       
+       if (cachedData.size() == 0) {
+         rescan();
+       }
+       
+       if (!scanNode.hasQual()) {
+         if (currentRow < cachedData.size()) {
+           aTuple = cachedData.get(currentRow++);
+           projector.eval(aTuple, outTuple);
+           outTuple.setOffset(aTuple.getOffset());
+           return outTuple;
+         }
+         return null;
+       } else {
+         while (currentRow < cachedData.size()) {
+           aTuple = cachedData.get(currentRow++);
+           if (qual.eval(inSchema, aTuple).isTrue()) {
+             projector.eval(aTuple, outTuple);
+             return outTuple;
+           }
+         }
+         return null;
+       }
+     }
+ 
+     @Override
+     public void rescan() throws IOException {
+       cachedData.clear();
+       cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema));
+ 
+       tableStats = new TableStats();
+       tableStats.setNumRows(cachedData.size());
+     }
+ 
+     @Override
+     public void close() throws IOException {
+       scanNode = null;
+       qual = null;
+       projector = null;
+       cachedData.clear();
+       currentRow = -1;
+       isClosed = true;
+     }
+ 
+     @Override
+     public float getProgress() {
+       return 1.0f;
+     }
+ 
+     @Override
+     protected void compile() throws CompilationError {
+       if (scanNode.hasQual()) {
+         qual = context.getPrecompiledEval(inSchema, qual);
+       }
+     }
+ 
+     @Override
+     public TableStats getInputStats() {
+       return tableStats;
+     }
+     
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 26476a3,2fbebc1..e7a3cf7
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@@ -42,12 -40,8 +42,9 @@@ import org.apache.tajo.engine.planner.p
  import org.apache.tajo.engine.planner.physical.StoreTableExec;
  import org.apache.tajo.engine.query.QueryContext;
  import org.apache.tajo.ipc.ClientProtos;
 +import org.apache.tajo.ipc.ClientProtos.ResultCode;
  import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
- import org.apache.tajo.master.NonForwardQueryResultFileScanner;
- import org.apache.tajo.master.NonForwardQueryResultScanner;
- import org.apache.tajo.master.NonForwardQueryResultSystemScanner;
- import org.apache.tajo.master.TajoMaster;
+ import org.apache.tajo.master.*;
  import org.apache.tajo.master.exec.prehook.CreateTableHook;
  import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
  import org.apache.tajo.master.exec.prehook.InsertIntoHook;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 0000000,2932694..4a8c188
mode 000000,100644..100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@@ -1,0 -1,738 +1,783 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.tajo.querymaster;
+ 
+ import com.google.common.collect.Maps;
+ import org.apache.commons.lang.exception.ExceptionUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.fs.ContentSummary;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.yarn.event.EventHandler;
+ import org.apache.hadoop.yarn.state.*;
+ import org.apache.hadoop.yarn.util.Clock;
+ import org.apache.tajo.ExecutionBlockId;
+ import org.apache.tajo.QueryId;
+ import org.apache.tajo.SessionVars;
+ import org.apache.tajo.TajoProtos.QueryState;
++import org.apache.tajo.catalog.*;
++import org.apache.tajo.catalog.exception.CatalogException;
+ import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
+ import org.apache.tajo.catalog.CatalogService;
+ import org.apache.tajo.catalog.TableDesc;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.engine.planner.global.ExecutionBlock;
+ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.plan.logical.*;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.master.event.*;
+ import org.apache.tajo.plan.util.PlannerUtil;
+ import org.apache.tajo.storage.StorageManager;
+ import org.apache.tajo.storage.StorageConstants;
+ import org.apache.tajo.util.TUtil;
+ import org.apache.tajo.util.history.QueryHistory;
+ import org.apache.tajo.util.history.StageHistory;
+ 
+ import java.io.IOException;
+ import java.util.*;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+ 
+ public class Query implements EventHandler<QueryEvent> {
+   private static final Log LOG = LogFactory.getLog(Query.class);
+ 
+   // Facilities for Query
+   private final TajoConf systemConf;
+   private final Clock clock;
+   private String queryStr;
+   private Map<ExecutionBlockId, Stage> stages;
+   private final EventHandler eventHandler;
+   private final MasterPlan plan;
+   QueryMasterTask.QueryMasterTaskContext context;
+   private ExecutionBlockCursor cursor;
+ 
+   // Query Status
+   private final QueryId id;
+   private long appSubmitTime;
+   private long startTime;
+   private long finishTime;
+   private TableDesc resultDesc;
+   private int completedStagesCount = 0;
+   private int successedStagesCount = 0;
+   private int killedStagesCount = 0;
+   private int failedStagesCount = 0;
+   private int erroredStagesCount = 0;
+   private final List<String> diagnostics = new ArrayList<String>();
+ 
+   // Internal Variables
+   private final Lock readLock;
+   private final Lock writeLock;
+   private int priority = 100;
+ 
+   // State Machine
+   private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+   private QueryState queryState;
+ 
+   // Transition Handler
+   private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+   private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
+   private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
+ 
+   protected static final StateMachineFactory
+       <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+       new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+           (QueryState.QUERY_NEW)
+ 
+           // Transitions from NEW state
+           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
+               QueryEventType.START,
+               new StartTransition())
+           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
+               QueryEventType.DIAGNOSTIC_UPDATE,
+               DIAGNOSTIC_UPDATE_TRANSITION)
+           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+               QueryEventType.KILL,
+               new KillNewQueryTransition())
+           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
+               QueryEventType.INTERNAL_ERROR,
+               INTERNAL_ERROR_TRANSITION)
+ 
+           // Transitions from RUNNING state
+           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+               QueryEventType.STAGE_COMPLETED,
+               STAGE_COMPLETED_TRANSITION)
+           .addTransition(QueryState.QUERY_RUNNING,
+               EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                   QueryState.QUERY_ERROR),
+               QueryEventType.QUERY_COMPLETED,
+               QUERY_COMPLETED_TRANSITION)
+           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+               QueryEventType.DIAGNOSTIC_UPDATE,
+               DIAGNOSTIC_UPDATE_TRANSITION)
+           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+               QueryEventType.KILL,
+               new KillAllStagesTransition())
+           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+               QueryEventType.INTERNAL_ERROR,
+               INTERNAL_ERROR_TRANSITION)
+ 
+           // Transitions from QUERY_SUCCEEDED state
+           .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+               QueryEventType.DIAGNOSTIC_UPDATE,
+               DIAGNOSTIC_UPDATE_TRANSITION)
+           // ignore-able transitions
+           .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+               QueryEventType.STAGE_COMPLETED,
+               STAGE_COMPLETED_TRANSITION)
+           .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+               QueryEventType.KILL)
+           .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+               QueryEventType.INTERNAL_ERROR,
+               INTERNAL_ERROR_TRANSITION)
+ 
+           // Transitions from KILL_WAIT state
+           .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+               QueryEventType.STAGE_COMPLETED,
+               STAGE_COMPLETED_TRANSITION)
+           .addTransition(QueryState.QUERY_KILL_WAIT,
+               EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                   QueryState.QUERY_ERROR),
+               QueryEventType.QUERY_COMPLETED,
+               QUERY_COMPLETED_TRANSITION)
+           .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+               QueryEventType.DIAGNOSTIC_UPDATE,
+               DIAGNOSTIC_UPDATE_TRANSITION)
+           .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+               QueryEventType.INTERNAL_ERROR,
+               INTERNAL_ERROR_TRANSITION)
+           // Ignore-able transitions
+           .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
+               QueryEventType.KILL,
+               QUERY_COMPLETED_TRANSITION)
+ 
+           // Transitions from FAILED state
+           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+               QueryEventType.DIAGNOSTIC_UPDATE,
+               DIAGNOSTIC_UPDATE_TRANSITION)
+           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
+               QueryEventType.INTERNAL_ERROR,
+               INTERNAL_ERROR_TRANSITION)
+           // Ignore-able transitions
+           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+               QueryEventType.KILL)
+ 
+           // Transitions from ERROR state
+           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+               QueryEventType.DIAGNOSTIC_UPDATE,
+               DIAGNOSTIC_UPDATE_TRANSITION)
+           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+               QueryEventType.INTERNAL_ERROR,
+               INTERNAL_ERROR_TRANSITION)
+           // Ignore-able transitions
+           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+               EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED))
+ 
+           .installTopology();
+ 
+   public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
+                final long appSubmitTime,
+                final String queryStr,
+                final EventHandler eventHandler,
+                final MasterPlan plan) {
+     this.context = context;
+     this.systemConf = context.getConf();
+     this.id = id;
+     this.clock = context.getClock();
+     this.appSubmitTime = appSubmitTime;
+     this.queryStr = queryStr;
+     this.stages = Maps.newConcurrentMap();
+     this.eventHandler = eventHandler;
+     this.plan = plan;
+     this.cursor = new ExecutionBlockCursor(plan, true);
+ 
+     StringBuilder sb = new StringBuilder("\n=======================================================");
+     sb.append("\nThe order of execution: \n");
+     int order = 1;
+     while (cursor.hasNext()) {
+       ExecutionBlock currentEB = cursor.nextBlock();
+       sb.append("\n").append(order).append(": ").append(currentEB.getId());
+       order++;
+     }
+     sb.append("\n=======================================================");
+     LOG.info(sb);
+     cursor.reset();
+ 
+     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+     this.readLock = readWriteLock.readLock();
+     this.writeLock = readWriteLock.writeLock();
+ 
+     stateMachine = stateMachineFactory.make(this);
+     queryState = stateMachine.getCurrentState();
+   }
+ 
+   public float getProgress() {
+     QueryState state = getState();
+     if (state == QueryState.QUERY_SUCCEEDED) {
+       return 1.0f;
+     } else {
+       int idx = 0;
+       List<Stage> tempStages = new ArrayList<Stage>();
+       synchronized(stages) {
+         tempStages.addAll(stages.values());
+       }
+ 
+       float [] subProgresses = new float[tempStages.size()];
+       for (Stage stage: tempStages) {
+         if (stage.getState() != StageState.NEW) {
+           subProgresses[idx] = stage.getProgress();
+         } else {
+           subProgresses[idx] = 0.0f;
+         }
+         idx++;
+       }
+ 
+       float totalProgress = 0.0f;
+       float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
+ 
+       for (int i = 0; i < subProgresses.length; i++) {
+         totalProgress += subProgresses[i] * proportion;
+       }
+ 
+       return totalProgress;
+     }
+   }
+ 
+   public long getAppSubmitTime() {
+     return this.appSubmitTime;
+   }
+ 
+   public long getStartTime() {
+     return startTime;
+   }
+ 
+   public void setStartTime() {
+     startTime = clock.getTime();
+   }
+ 
+   public long getFinishTime() {
+     return finishTime;
+   }
+ 
+   public void setFinishTime() {
+     finishTime = clock.getTime();
+   }
+ 
+   public QueryHistory getQueryHistory() {
+     QueryHistory queryHistory = makeQueryHistory();
+     queryHistory.setStageHistories(makeStageHistories());
+     return queryHistory;
+   }
+ 
+   private List<StageHistory> makeStageHistories() {
+     List<StageHistory> stageHistories = new ArrayList<StageHistory>();
+     for(Stage eachStage : getStages()) {
+       stageHistories.add(eachStage.getStageHistory());
+     }
+ 
+     return stageHistories;
+   }
+ 
+   private QueryHistory makeQueryHistory() {
+     QueryHistory queryHistory = new QueryHistory();
+ 
+     queryHistory.setQueryId(getId().toString());
+     queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
+     queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
+     queryHistory.setLogicalPlan(plan.toString());
+     queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
+     queryHistory.setDistributedPlan(plan.toString());
+ 
+     List<String[]> sessionVariables = new ArrayList<String[]>();
+     for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) {
+       if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) {
+         sessionVariables.add(new String[]{entry.getKey(), entry.getValue()});
+       }
+     }
+     queryHistory.setSessionVariables(sessionVariables);
+ 
+     return queryHistory;
+   }
+ 
+   public List<String> getDiagnostics() {
+     readLock.lock();
+     try {
+       return diagnostics;
+     } finally {
+       readLock.unlock();
+     }
+   }
+ 
+   protected void addDiagnostic(String diag) {
+     diagnostics.add(diag);
+   }
+ 
+   public TableDesc getResultDesc() {
+     return resultDesc;
+   }
+ 
+   public void setResultDesc(TableDesc desc) {
+     resultDesc = desc;
+   }
+ 
+   public MasterPlan getPlan() {
+     return plan;
+   }
+ 
+   public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+     return stateMachine;
+   }
+   
+   public void addStage(Stage stage) {
+     stages.put(stage.getId(), stage);
+   }
+   
+   public QueryId getId() {
+     return this.id;
+   }
+ 
+   public Stage getStage(ExecutionBlockId id) {
+     return this.stages.get(id);
+   }
+ 
+   public Collection<Stage> getStages() {
+     return this.stages.values();
+   }
+ 
+   public QueryState getSynchronizedState() {
+     readLock.lock();
+     try {
+       return stateMachine.getCurrentState();
+     } finally {
+       readLock.unlock();
+     }
+   }
+ 
+   /* non-blocking call for client API */
+   public QueryState getState() {
+     return queryState;
+   }
+ 
+   public ExecutionBlockCursor getExecutionBlockCursor() {
+     return cursor;
+   }
+ 
+   public static class StartTransition
+       implements SingleArcTransition<Query, QueryEvent> {
+ 
+     @Override
+     public void transition(Query query, QueryEvent queryEvent) {
+ 
+       query.setStartTime();
+       Stage stage = new Stage(query.context, query.getPlan(),
+           query.getExecutionBlockCursor().nextBlock());
+       stage.setPriority(query.priority--);
+       query.addStage(stage);
+ 
+       stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+       LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+     }
+   }
+ 
+   public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+ 
+     @Override
+     public QueryState transition(Query query, QueryEvent queryEvent) {
+       QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent;
+       QueryState finalState;
+ 
+       if (stageEvent.getState() == StageState.SUCCEEDED) {
+         finalState = finalizeQuery(query, stageEvent);
+       } else if (stageEvent.getState() == StageState.FAILED) {
+         finalState = QueryState.QUERY_FAILED;
+       } else if (stageEvent.getState() == StageState.KILLED) {
+         finalState = QueryState.QUERY_KILLED;
+       } else {
+         finalState = QueryState.QUERY_ERROR;
+       }
+       if (finalState != QueryState.QUERY_SUCCEEDED) {
+         Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
+         if (lastStage != null && lastStage.getTableMeta() != null) {
+           StoreType storeType = lastStage.getTableMeta().getStoreType();
+           if (storeType != null) {
+             LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+             try {
+               StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
+             } catch (IOException e) {
+               LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+             }
+           }
+         }
+       }
+       query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+       query.setFinishTime();
+ 
+       return finalState;
+     }
+ 
+     private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
+       Stage lastStage = query.getStage(event.getExecutionBlockId());
+       StoreType storeType = lastStage.getTableMeta().getStoreType();
+       try {
+         LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+         CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+         TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+ 
+         Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
+             .commitOutputData(query.context.getQueryContext(),
+                 lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
+ 
+         QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+         hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
+       } catch (Exception e) {
+         query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+         return QueryState.QUERY_ERROR;
+       }
+ 
+       return QueryState.QUERY_SUCCEEDED;
+     }
+ 
+     private static interface QueryHook {
+       boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
+       void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
+                    ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
+     }
+ 
+     private class QueryHookExecutor {
+       private List<QueryHook> hookList = TUtil.newList();
+       private QueryMaster.QueryMasterContext context;
+ 
+       public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
+         this.context = context;
+         hookList.add(new MaterializedResultHook());
+         hookList.add(new CreateTableHook());
+         hookList.add(new InsertTableHook());
++        hookList.add(new CreateIndexHook());
+       }
+ 
+       public void execute(QueryContext queryContext, Query query,
+                           ExecutionBlockId finalExecBlockId,
+                           Path finalOutputDir) throws Exception {
+         for (QueryHook hook : hookList) {
+           if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
+             hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
+           }
+         }
+       }
++    }
++
++    private class CreateIndexHook implements QueryHook {
++
++      @Override
++      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
++        Stage lastStage = query.getStage(finalExecBlockId);
++        return  lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX;
++      }
++
++      @Override
++      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
++        CatalogService catalog = context.getWorkerContext().getCatalog();
++        Stage lastStage = query.getStage(finalExecBlockId);
++
++        CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan();
++        String databaseName, simpleIndexName, qualifiedIndexName;
++        if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
++          String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
++          databaseName = splits[0];
++          simpleIndexName = splits[1];
++          qualifiedIndexName = createIndexNode.getIndexName();
++        } else {
++          databaseName = queryContext.getCurrentDatabase();
++          simpleIndexName = createIndexNode.getIndexName();
++          qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
++        }
++        ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
++        if (scanNode == null) {
++          throw new IOException("Cannot find the table of the relation");
++        }
++        IndexDesc indexDesc = new IndexDesc(databaseName, scanNode.getTableName(),
++            simpleIndexName, createIndexNode.getIndexPath(),
++            createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
++            createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
++        if (catalog.createIndex(indexDesc)) {
++          LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
++        } else {
++          LOG.info("Index creation " + qualifiedIndexName + " is failed.");
++          throw new CatalogException("Cannot create index \"" + qualifiedIndexName + "\".");
++        }
++      }
+     }
+ 
+     private class MaterializedResultHook implements QueryHook {
+ 
+       @Override
+       public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                 Path finalOutputDir) {
+         Stage lastStage = query.getStage(finalExecBlockId);
+         NodeType type = lastStage.getBlock().getPlan().getType();
+         return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
+       }
+ 
+       @Override
+       public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                           Query query, ExecutionBlockId finalExecBlockId,
+                           Path finalOutputDir) throws Exception {
+         Stage lastStage = query.getStage(finalExecBlockId);
+         TableMeta meta = lastStage.getTableMeta();
+ 
+         String nullChar = queryContext.get(SessionVars.NULL_CHAR);
+         meta.putOption(StorageConstants.TEXT_NULL, nullChar);
+ 
+         TableStats stats = lastStage.getResultStats();
+ 
+         TableDesc resultTableDesc =
+             new TableDesc(
+                 query.getId().toString(),
+                 lastStage.getSchema(),
+                 meta,
+                 finalOutputDir.toUri());
+         resultTableDesc.setExternal(true);
+ 
+         stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+         resultTableDesc.setStats(stats);
+         query.setResultDesc(resultTableDesc);
+       }
+     }
+ 
+     private class CreateTableHook implements QueryHook {
+ 
+       @Override
+       public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                 Path finalOutputDir) {
+         Stage lastStage = query.getStage(finalExecBlockId);
+         return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
+       }
+ 
+       @Override
+       public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                           Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+         CatalogService catalog = context.getWorkerContext().getCatalog();
+         Stage lastStage = query.getStage(finalExecBlockId);
+         TableStats stats = lastStage.getResultStats();
+ 
+         CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
+         TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
+ 
+         TableDesc tableDescTobeCreated =
+             new TableDesc(
+                 createTableNode.getTableName(),
+                 createTableNode.getTableSchema(),
+                 meta,
+                 finalOutputDir.toUri());
+         tableDescTobeCreated.setExternal(createTableNode.isExternal());
+ 
+         if (createTableNode.hasPartition()) {
+           tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
+         }
+ 
+         stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+         tableDescTobeCreated.setStats(stats);
+         query.setResultDesc(tableDescTobeCreated);
+ 
+         catalog.createTable(tableDescTobeCreated);
+       }
+     }
+ 
+     private class InsertTableHook implements QueryHook {
+ 
+       @Override
+       public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                 Path finalOutputDir) {
+         Stage lastStage = query.getStage(finalExecBlockId);
+         return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
+       }
+ 
+       @Override
+       public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                           Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
+           throws Exception {
+ 
+         CatalogService catalog = context.getWorkerContext().getCatalog();
+         Stage lastStage = query.getStage(finalExecBlockId);
+         TableMeta meta = lastStage.getTableMeta();
+         TableStats stats = lastStage.getResultStats();
+ 
+         InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
+ 
+         TableDesc finalTable;
+         if (insertNode.hasTargetTable()) {
+           String tableName = insertNode.getTableName();
+           finalTable = catalog.getTableDesc(tableName);
+         } else {
+           String tableName = query.getId().toString();
+           finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri());
+         }
+ 
+         long volume = getTableVolume(query.systemConf, finalOutputDir);
+         stats.setNumBytes(volume);
+         finalTable.setStats(stats);
+ 
+         if (insertNode.hasTargetTable()) {
+           UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
+           builder.setTableName(finalTable.getName());
+           builder.setStats(stats.getProto());
+ 
+           catalog.updateTableStats(builder.build());
+         }
+ 
+         query.setResultDesc(finalTable);
+       }
+     }
+   }
+ 
+   public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+     FileSystem fs = tablePath.getFileSystem(systemConf);
+     ContentSummary directorySummary = fs.getContentSummary(tablePath);
+     return directorySummary.getLength();
+   }
+ 
+   public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+ 
+     private boolean hasNext(Query query) {
+       ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+       ExecutionBlock nextBlock = cursor.peek();
+       return !query.getPlan().isTerminal(nextBlock);
+     }
+ 
+     private void executeNextBlock(Query query) {
+       ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+       ExecutionBlock nextBlock = cursor.nextBlock();
+       Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
+       nextStage.setPriority(query.priority--);
+       query.addStage(nextStage);
+       nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+ 
+       LOG.info("Scheduling Stage:" + nextStage.getId());
+       if(LOG.isDebugEnabled()) {
+         LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
+         LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+       }
+     }
+ 
+     @Override
+     public void transition(Query query, QueryEvent event) {
+       try {
+         query.completedStagesCount++;
+         StageCompletedEvent castEvent = (StageCompletedEvent) event;
+ 
+         if (castEvent.getState() == StageState.SUCCEEDED) {
+           query.successedStagesCount++;
+         } else if (castEvent.getState() == StageState.KILLED) {
+           query.killedStagesCount++;
+         } else if (castEvent.getState() == StageState.FAILED) {
+           query.failedStagesCount++;
+         } else if (castEvent.getState() == StageState.ERROR) {
+           query.erroredStagesCount++;
+         } else {
+           LOG.error(String.format("Invalid Stage (%s) State %s at %s",
+               castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name()));
+           query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+         }
+ 
+         // if a stage is succeeded and a query is running
+         if (castEvent.getState() == StageState.SUCCEEDED &&  // latest stage succeeded
+             query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
+             hasNext(query)) {                                   // there remains at least one stage.
+           query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
+           executeNextBlock(query);
+         } else { // if a query is completed due to finished, kill, failure, or error
+           query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+         }
+       } catch (Throwable t) {
+         LOG.error(t.getMessage(), t);
+         query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+       }
+     }
+   }
+ 
+   private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
+     @Override
+     public void transition(Query query, QueryEvent event) {
+       query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+     }
+   }
+ 
+   private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
+     @Override
+     public void transition(Query query, QueryEvent event) {
+       query.setFinishTime();
+       query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+     }
+   }
+ 
+   private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> {
+     @Override
+     public void transition(Query query, QueryEvent event) {
+       synchronized (query.stages) {
+         for (Stage stage : query.stages.values()) {
+           query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+         }
+       }
+     }
+   }
+ 
+   private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+ 
+     @Override
+     public void transition(Query query, QueryEvent event) {
+       query.setFinishTime();
+       query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+     }
+   }
+ 
+   @Override
+   public void handle(QueryEvent event) {
+     LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+     try {
+       writeLock.lock();
+       QueryState oldState = getSynchronizedState();
+       try {
+         getStateMachine().doTransition(event.getType(), event);
+         queryState = getSynchronizedState();
+       } catch (InvalidStateTransitonException e) {
+         LOG.error("Can't handle this event at current state"
+             + ", type:" + event
+             + ", oldState:" + oldState.name()
+             + ", nextState:" + getSynchronizedState().name()
+             , e);
+         eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
+       }
+ 
+       //notify the eventhandler of state change
+       if (oldState != getSynchronizedState()) {
+         LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState());
+       }
+     }
+ 
+     finally {
+       writeLock.unlock();
+     }
+   }
+ }