You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:19 UTC

[11/13] tajo git commit: TAJO-1288: Refactoring org.apache.tajo.master package.

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
new file mode 100644
index 0000000..f645dc5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.codegen.CompilationError;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import com.google.protobuf.ByteString;
+
+public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
+  
+  private final Log LOG = LogFactory.getLog(getClass());
+  
+  private MasterContext masterContext;
+  private LogicalPlan logicalPlan;
+  private final QueryId queryId;
+  private final String sessionId;
+  private TaskAttemptContext taskContext;
+  private int currentRow;
+  private long maxRow;
+  private TableDesc tableDesc;
+  private Schema outSchema;
+  private RowStoreEncoder encoder;
+  private PhysicalExec physicalExec;
+  
+  public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId, 
+      String sessionId, int maxRow) {
+    masterContext = context;
+    logicalPlan = plan;
+    this.queryId = queryId;
+    this.sessionId = sessionId;
+    this.maxRow = maxRow;
+    
+  }
+  
+  @Override
+  public void init() throws IOException {
+    QueryContext queryContext = new QueryContext(masterContext.getConf());
+    currentRow = 0;
+    
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
+    try {
+      globalPlanner.build(masterPlan);
+    } catch (PlanningException e) {
+      throw new RuntimeException(e);
+    }
+    
+    ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
+    ExecutionBlock leafBlock = null;
+    while (cursor.hasNext()) {
+      ExecutionBlock block = cursor.nextBlock();
+      if (masterPlan.isLeaf(block)) {
+        leafBlock = block;
+        break;
+      }
+    }
+    
+    taskContext = new TaskAttemptContext(queryContext, null,
+        new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0),
+        null, null);
+    physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf())
+      .createPlan(taskContext, leafBlock.getPlan());
+    
+    tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(), 
+        new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null);
+    outSchema = physicalExec.getSchema();
+    encoder = RowStoreUtil.createEncoder(getLogicalSchema());
+    
+    physicalExec.init();
+  }
+
+  @Override
+  public void close() throws Exception {
+    tableDesc = null;
+    outSchema = null;
+    encoder = null;
+    if (physicalExec != null) {
+      try {
+        physicalExec.close();
+      } catch (Exception ignored) {}
+    }
+    physicalExec = null;
+    currentRow = -1;
+  }
+  
+  private List<Tuple> getTablespaces(Schema outSchema) {
+    List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces();
+    List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TablespaceProto tablespace: tablespaces) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+          if (tablespace.hasId()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName()));
+        } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) {
+          if (tablespace.hasHandler()) {
+            aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri()));
+        }
+      }
+      tuples.add(aTuple);
+    }
+    
+    return tuples;    
+  }
+  
+  private List<Tuple> getDatabases(Schema outSchema) {
+    List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases();
+    List<Tuple> tuples = new ArrayList<Tuple>(databases.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (DatabaseProto database: databases) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(database.getId()));
+        } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(database.getName()));
+        } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+          if (database.hasSpaceId()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getTables(Schema outSchema) {
+    List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables();
+    List<Tuple> tuples = new ArrayList<Tuple>(tables.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TableDescriptorProto table: tables) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(table.getTid()));
+        } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId()));
+        } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(table.getName()));
+        } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) {
+          if (table.hasTableType()) {
+            aTuple.put(fieldId, DatumFactory.createText(table.getTableType()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(table.getPath()));
+        } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(table.getStoreType()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getColumns(Schema outSchema) {
+    List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns();
+    List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    int columnId = 1, prevtid = -1, tid = 0;
+    
+    for (ColumnProto column: columnsList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      tid = column.getTid();
+      if (prevtid != tid) {
+        columnId = 1;
+        prevtid = tid;
+      }
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column colObj = columns.get(fieldId);
+        
+        if ("tid".equalsIgnoreCase(colObj.getSimpleName())) {
+          if (column.hasTid()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(tid));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(column.getName()));
+        } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(columnId));
+        } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString()));
+        } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) {
+          DataType dataType = column.getDataType();
+          if (dataType.hasLength()) {
+            aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        }
+      }
+      
+      columnId++;
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getIndexes(Schema outSchema) {
+    List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+    List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (IndexProto index: indexList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+        } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+        } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
+        } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
+        } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
+        } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
+        } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
+        } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
+        } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getAllTableOptions(Schema outSchema) {
+    List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions();
+    List<Tuple> tuples = new ArrayList<Tuple>(optionList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TableOptionProto option: optionList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(option.getTid()));
+        } else if ("key_".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey()));
+        } else if ("value_".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getAllTableStats(Schema outSchema) {
+    List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats();
+    List<Tuple> tuples = new ArrayList<Tuple>(statList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TableStatsProto stat: statList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid()));
+        } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows()));
+        } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> getAllPartitions(Schema outSchema) {
+    List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions();
+    List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size());
+    List<Column> columns = outSchema.getColumns();
+    Tuple aTuple;
+    
+    for (TablePartitionProto partition: partitionList) {
+      aTuple = new VTuple(outSchema.size());
+      
+      for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+        Column column = columns.get(fieldId);
+        
+        if ("pid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid()));
+        } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid()));
+        } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) {
+          if (partition.hasPartitionName()) {
+            aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName()));
+          } else {
+            aTuple.put(fieldId, DatumFactory.createNullDatum());
+          }
+        } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition()));
+        } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(partition.getPath()));
+        }
+      }
+      
+      tuples.add(aTuple);
+    }
+    
+    return tuples;
+  }
+  
+  private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
+    List<Tuple> tuples = null;
+    String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
+
+    if ("tablespace".equalsIgnoreCase(tableName)) {
+      tuples = getTablespaces(inSchema);
+    } else if ("databases".equalsIgnoreCase(tableName)) {
+      tuples = getDatabases(inSchema);
+    } else if ("tables".equalsIgnoreCase(tableName)) {
+      tuples = getTables(inSchema);
+    } else if ("columns".equalsIgnoreCase(tableName)) {
+      tuples = getColumns(inSchema);
+    } else if ("indexes".equalsIgnoreCase(tableName)) {
+      tuples = getIndexes(inSchema);
+    } else if ("table_options".equalsIgnoreCase(tableName)) {
+      tuples = getAllTableOptions(inSchema);
+    } else if ("table_stats".equalsIgnoreCase(tableName)) {
+      tuples = getAllTableStats(inSchema);
+    } else if ("partitions".equalsIgnoreCase(tableName)) {
+      tuples = getAllPartitions(inSchema);
+    }
+    
+    return tuples;    
+  }
+
+  @Override
+  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+    List<ByteString> rows = new ArrayList<ByteString>();
+    int startRow = currentRow;
+    int endRow = startRow + fetchRowNum;
+    
+    if (physicalExec == null) {
+      return rows;
+    }
+    
+    while (currentRow < endRow) {
+      Tuple currentTuple = physicalExec.next();
+      
+      if (currentTuple == null) {
+        physicalExec.close();
+        physicalExec = null;
+        break;
+      }
+      
+      currentRow++;
+      rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple)));
+      
+      if (currentRow >= maxRow) {
+        physicalExec.close();
+        physicalExec = null;
+        break;
+      }
+    }
+    
+    return rows;
+  }
+
+  @Override
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  @Override
+  public String getSessionId() {
+    return sessionId;
+  }
+  
+  @Override
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+  
+  @Override
+  public Schema getLogicalSchema() {
+    return outSchema;
+  }
+  
+  class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
+
+    public SimplePhysicalPlannerImpl(TajoConf conf) {
+      super(conf);
+    }
+
+    @Override
+    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+        throws IOException {
+      return new SystemPhysicalExec(ctx, scanNode);
+    }
+
+    @Override
+    public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException {
+      return new SystemPhysicalExec(ctx, annotation);
+    }
+  }
+  
+  class SystemPhysicalExec extends PhysicalExec {
+    
+    private ScanNode scanNode;
+    private EvalNode qual;
+    private Projector projector;
+    private TableStats tableStats;
+    private final List<Tuple> cachedData;
+    private int currentRow;
+    private boolean isClosed;
+
+    public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) {
+      super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+      this.scanNode = scanNode;
+      this.qual = this.scanNode.getQual();
+      cachedData = TUtil.newList();
+      currentRow = 0;
+      isClosed = false;
+      
+      projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
+    }
+
+    @Override
+    public Tuple next() throws IOException {
+      Tuple aTuple = null;
+      Tuple outTuple = new VTuple(outColumnNum);
+      
+      if (isClosed) {
+        return null;
+      }
+      
+      if (cachedData.size() == 0) {
+        rescan();
+      }
+      
+      if (!scanNode.hasQual()) {
+        if (currentRow < cachedData.size()) {
+          aTuple = cachedData.get(currentRow++);
+          projector.eval(aTuple, outTuple);
+          outTuple.setOffset(aTuple.getOffset());
+          return outTuple;
+        }
+        return null;
+      } else {
+        while (currentRow < cachedData.size()) {
+          aTuple = cachedData.get(currentRow++);
+          if (qual.eval(inSchema, aTuple).isTrue()) {
+            projector.eval(aTuple, outTuple);
+            return outTuple;
+          }
+        }
+        return null;
+      }
+    }
+
+    @Override
+    public void rescan() throws IOException {
+      cachedData.clear();
+      cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema));
+
+      tableStats = new TableStats();
+      tableStats.setNumRows(cachedData.size());
+    }
+
+    @Override
+    public void close() throws IOException {
+      scanNode = null;
+      qual = null;
+      projector = null;
+      cachedData.clear();
+      currentRow = -1;
+      isClosed = true;
+    }
+
+    @Override
+    public float getProgress() {
+      return 1.0f;
+    }
+
+    @Override
+    protected void compile() throws CompilationError {
+      if (scanNode.hasQual()) {
+        qual = context.getPrecompiledEval(inSchema, qual);
+      }
+    }
+
+    @Override
+    public TableStats getInputStats() {
+      return tableStats;
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2242445..2fbebc1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -41,15 +41,12 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
-import org.apache.tajo.master.NonForwardQueryResultFileScanner;
-import org.apache.tajo.master.NonForwardQueryResultScanner;
-import org.apache.tajo.master.NonForwardQueryResultSystemScanner;
-import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.*;
 import org.apache.tajo.master.exec.prehook.CreateTableHook;
 import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
 import org.apache.tajo.master.exec.prehook.InsertIntoHook;
-import org.apache.tajo.master.querymaster.*;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.querymaster.*;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.Target;

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
deleted file mode 100644
index 3d6669c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.ha;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The HAService is responsible for setting active TajoMaster on startup or when the
- * current active is changing (eg due to failure), monitoring the health of TajoMaster.
- *
- */
-public interface HAService {
-
-  /**
-   * Add master name to shared storage.
-   */
-  public void register() throws IOException;
-
-
-  /**
-   * Delete master name to shared storage.
-   *
-   */
-  public void delete() throws IOException;
-
-  /**
-   *
-   * @return True if current master is an active master.
-   */
-  public boolean isActiveStatus();
-
-  /**
-   *
-   * @return return all master list
-   * @throws IOException
-   */
-  public List<TajoMasterInfo> getMasters() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
deleted file mode 100644
index 45219b3..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.ha;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAConstants;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
- *
- */
-public class HAServiceHDFSImpl implements HAService {
-  private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
-
-  private MasterContext context;
-  private TajoConf conf;
-
-  private FileSystem fs;
-
-  private String masterName;
-  private Path rootPath;
-  private Path haPath;
-  private Path activePath;
-  private Path backupPath;
-
-  private boolean isActiveStatus = false;
-
-  //thread which runs periodically to see the last time since a heartbeat is received.
-  private Thread checkerThread;
-  private volatile boolean stopped = false;
-
-  private int monitorInterval;
-
-  private String currentActiveMaster;
-
-  public HAServiceHDFSImpl(MasterContext context) throws IOException {
-    this.context = context;
-    this.conf = context.getConf();
-    initSystemDirectory();
-
-    InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
-    this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
-
-    monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
-  }
-
-  private void initSystemDirectory() throws IOException {
-    // Get Tajo root dir
-    this.rootPath = TajoConf.getTajoRootDir(conf);
-
-    // Check Tajo root dir
-    this.fs = rootPath.getFileSystem(conf);
-
-    // Check and create Tajo system HA dir
-    haPath = TajoConf.getSystemHADir(conf);
-    if (!fs.exists(haPath)) {
-      fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA dir '" + haPath + "' is created");
-    }
-
-    activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
-    if (!fs.exists(activePath)) {
-      fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA Active dir '" + activePath + "' is created");
-    }
-
-    backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
-    if (!fs.exists(backupPath)) {
-      fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
-      LOG.info("System HA Backup dir '" + backupPath + "' is created");
-    }
-  }
-
-  private void startPingChecker() {
-    if (checkerThread == null) {
-      checkerThread = new Thread(new PingChecker());
-      checkerThread.setName("Ping Checker");
-      checkerThread.start();
-    }
-  }
-
-  @Override
-  public void register() throws IOException {
-    FileStatus[] files = fs.listStatus(activePath);
-
-    // Phase 1: If there is not another active master, this try to become active master.
-    if (files.length == 0) {
-      createMasterFile(true);
-      currentActiveMaster = masterName;
-      LOG.info(String.format("This is added to active master (%s)", masterName));
-    } else {
-      // Phase 2: If there is active master information, we need to check its status.
-      Path activePath = files[0].getPath();
-      currentActiveMaster = activePath.getName().replaceAll("_", ":");
-
-      // Phase 3: If current active master is dead, this master should be active master.
-      if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
-        fs.delete(activePath, true);
-        createMasterFile(true);
-        currentActiveMaster = masterName;
-        LOG.info(String.format("This is added to active master (%s)", masterName));
-      } else {
-        // Phase 4: If current active master is alive, this master need to be backup master.
-        createMasterFile(false);
-        LOG.info(String.format("This is added to backup masters (%s)", masterName));
-      }
-    }
-  }
-
-  private void createMasterFile(boolean isActive) throws IOException {
-    String fileName = masterName.replaceAll(":", "_");
-    Path path = null;
-
-    if (isActive) {
-      path = new Path(activePath, fileName);
-    } else {
-      path = new Path(backupPath, fileName);
-    }
-
-    StringBuilder sb = new StringBuilder();
-    InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.CATALOG_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
-    address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
-    sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
-
-    FSDataOutputStream out = fs.create(path);
-
-    try {
-      out.writeUTF(sb.toString());
-      out.hflush();
-      out.close();
-    } catch (FileAlreadyExistsException e) {
-      createMasterFile(false);
-    }
-
-    if (isActive) {
-      isActiveStatus = true;
-    } else {
-      isActiveStatus = false;
-    }
-
-    startPingChecker();
-  }
-
-
-  private InetSocketAddress getHostAddress(int type) {
-    InetSocketAddress address = null;
-
-    switch (type) {
-      case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
-        break;
-      case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .TAJO_MASTER_CLIENT_RPC_ADDRESS);
-        break;
-      case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .RESOURCE_TRACKER_RPC_ADDRESS);
-        break;
-      case HAConstants.CATALOG_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-          .CATALOG_ADDRESS);
-        break;
-      case HAConstants.MASTER_INFO_ADDRESS:
-        address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
-        .TAJO_MASTER_INFO_ADDRESS);
-      default:
-        break;
-    }
-
-    return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
-  }
-
-  @Override
-  public void delete() throws IOException {
-    String fileName = masterName.replaceAll(":", "_");
-
-    Path activeFile = new Path(activePath, fileName);
-    if (fs.exists(activeFile)) {
-      fs.delete(activeFile, true);
-    }
-
-    Path backupFile = new Path(backupPath, fileName);
-    if (fs.exists(backupFile)) {
-      fs.delete(backupFile, true);
-    }
-    if (isActiveStatus) {
-      isActiveStatus = false;
-    }
-    stopped = true;
-  }
-
-  @Override
-  public boolean isActiveStatus() {
-    return isActiveStatus;
-  }
-
-  @Override
-  public List<TajoMasterInfo> getMasters() throws IOException {
-    List<TajoMasterInfo> list = TUtil.newList();
-    Path path = null;
-
-    FileStatus[] files = fs.listStatus(activePath);
-    if (files.length == 1) {
-      path = files[0].getPath();
-      list.add(createTajoMasterInfo(path, true));
-    }
-
-    files = fs.listStatus(backupPath);
-    for (FileStatus status : files) {
-      path = status.getPath();
-      list.add(createTajoMasterInfo(path, false));
-    }
-
-    return list;
-  }
-
-  private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
-    String masterAddress = path.getName().replaceAll("_", ":");
-    boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
-
-    FSDataInputStream stream = fs.open(path);
-    String data = stream.readUTF();
-
-    stream.close();
-
-    String[] addresses = data.split("_");
-    TajoMasterInfo info = new TajoMasterInfo();
-
-    info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
-    info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
-    info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
-    info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
-    info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
-
-    info.setAvailable(isAlive);
-    info.setActive(isActive);
-
-    return info;
-  }
-
-  private class PingChecker implements Runnable {
-    @Override
-    public void run() {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        synchronized (HAServiceHDFSImpl.this) {
-          try {
-            if (!currentActiveMaster.equals(masterName)) {
-              boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
-                  + ", isAlive:" + isAlive);
-              }
-
-              // If active master is dead, this master should be active master instead of
-              // previous active master.
-              if (!isAlive) {
-                FileStatus[] files = fs.listStatus(activePath);
-                if (files.length == 0 || (files.length ==  1
-                  && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
-                  delete();
-                  register();
-                }
-              }
-            }
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        }
-        try {
-          Thread.sleep(monitorInterval);
-        } catch (InterruptedException e) {
-          LOG.info("PingChecker interrupted. - masterName:" + masterName);
-          break;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java
deleted file mode 100644
index 6ed975a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.ha;
-
-import java.net.InetSocketAddress;
-
-public class TajoMasterInfo {
-
-  private boolean available;
-  private boolean isActive;
-
-  private InetSocketAddress tajoMasterAddress;
-  private InetSocketAddress tajoClientAddress;
-  private InetSocketAddress workerResourceTrackerAddr;
-  private InetSocketAddress catalogAddress;
-  private InetSocketAddress webServerAddress;
-
-  public InetSocketAddress getTajoMasterAddress() {
-    return tajoMasterAddress;
-  }
-
-  public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
-    this.tajoMasterAddress = tajoMasterAddress;
-  }
-
-  public InetSocketAddress getTajoClientAddress() {
-    return tajoClientAddress;
-  }
-
-  public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
-    this.tajoClientAddress = tajoClientAddress;
-  }
-
-  public InetSocketAddress getWorkerResourceTrackerAddr() {
-    return workerResourceTrackerAddr;
-  }
-
-  public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
-    this.workerResourceTrackerAddr = workerResourceTrackerAddr;
-  }
-
-  public InetSocketAddress getCatalogAddress() {
-    return catalogAddress;
-  }
-
-  public void setCatalogAddress(InetSocketAddress catalogAddress) {
-    this.catalogAddress = catalogAddress;
-  }
-
-  public InetSocketAddress getWebServerAddress() {
-    return webServerAddress;
-  }
-
-  public void setWebServerAddress(InetSocketAddress webServerAddress) {
-    this.webServerAddress = webServerAddress;
-  }
-
-  public boolean isAvailable() {
-    return available;
-  }
-
-  public void setAvailable(boolean available) {
-    this.available = available;
-  }
-
-  public boolean isActive() {
-    return isActive;
-  }
-
-  public void setActive(boolean active) {
-    isActive = active;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
deleted file mode 100644
index 7c3d283..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.metrics;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricSet;
-import org.apache.tajo.master.TajoMaster;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-
-public class CatalogMetricsGaugeSet implements MetricSet {
-  TajoMaster.MasterContext tajoMasterContext;
-  public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
-    this.tajoMasterContext = tajoMasterContext;
-  }
-
-  @Override
-  public Map<String, Metric> getMetrics() {
-    Map<String, Metric> metricsMap = new HashMap<String, Metric>();
-    metricsMap.put("numTables", new Gauge<Integer>() {
-      @Override
-      public Integer getValue() {
-        return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size();
-      }
-    });
-
-    metricsMap.put("numFunctions", new Gauge<Integer>() {
-      @Override
-      public Integer getValue() {
-        return tajoMasterContext.getCatalog().getFunctions().size();
-      }
-    });
-
-    return metricsMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
deleted file mode 100644
index 993d3b7..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.metrics;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricSet;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerState;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class WorkerResourceMetricsGaugeSet implements MetricSet {
-  TajoMaster.MasterContext tajoMasterContext;
-  public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
-    this.tajoMasterContext = tajoMasterContext;
-  }
-
-  @Override
-  public Map<String, Metric> getMetrics() {
-    Map<String, Metric> metricsMap = new HashMap<String, Metric>();
-    metricsMap.put("totalWorkers", new Gauge<Integer>() {
-      @Override
-      public Integer getValue() {
-        return tajoMasterContext.getResourceManager().getWorkers().size();
-      }
-    });
-
-    metricsMap.put("liveWorkers", new Gauge<Integer>() {
-      @Override
-      public Integer getValue() {
-        return getNumWorkers(WorkerState.RUNNING);
-      }
-    });
-
-    metricsMap.put("deadWorkers", new Gauge<Integer>() {
-      @Override
-      public Integer getValue() {
-        return getNumWorkers(WorkerState.LOST);
-      }
-    });
-
-    return metricsMap;
-  }
-
-  protected int getNumWorkers(WorkerState status) {
-    int numWorkers = 0;
-    for(Worker eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) {
-      if(eachWorker.getState() == status) {
-        numWorkers++;
-      }
-    }
-
-    return numWorkers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
deleted file mode 100644
index a626df1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ /dev/null
@@ -1,738 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.util.TUtil;
-import org.apache.tajo.util.history.QueryHistory;
-import org.apache.tajo.util.history.StageHistory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class Query implements EventHandler<QueryEvent> {
-  private static final Log LOG = LogFactory.getLog(Query.class);
-
-  // Facilities for Query
-  private final TajoConf systemConf;
-  private final Clock clock;
-  private String queryStr;
-  private Map<ExecutionBlockId, Stage> stages;
-  private final EventHandler eventHandler;
-  private final MasterPlan plan;
-  QueryMasterTask.QueryMasterTaskContext context;
-  private ExecutionBlockCursor cursor;
-
-  // Query Status
-  private final QueryId id;
-  private long appSubmitTime;
-  private long startTime;
-  private long finishTime;
-  private TableDesc resultDesc;
-  private int completedStagesCount = 0;
-  private int successedStagesCount = 0;
-  private int killedStagesCount = 0;
-  private int failedStagesCount = 0;
-  private int erroredStagesCount = 0;
-  private final List<String> diagnostics = new ArrayList<String>();
-
-  // Internal Variables
-  private final Lock readLock;
-  private final Lock writeLock;
-  private int priority = 100;
-
-  // State Machine
-  private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
-  private QueryState queryState;
-
-  // Transition Handler
-  private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
-  private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
-  private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
-  private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
-
-  protected static final StateMachineFactory
-      <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
-      new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
-          (QueryState.QUERY_NEW)
-
-          // Transitions from NEW state
-          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
-              QueryEventType.START,
-              new StartTransition())
-          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
-              QueryEventType.DIAGNOSTIC_UPDATE,
-              DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
-              QueryEventType.KILL,
-              new KillNewQueryTransition())
-          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
-              QueryEventType.INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
-
-          // Transitions from RUNNING state
-          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
-              QueryEventType.STAGE_COMPLETED,
-              STAGE_COMPLETED_TRANSITION)
-          .addTransition(QueryState.QUERY_RUNNING,
-              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
-                  QueryState.QUERY_ERROR),
-              QueryEventType.QUERY_COMPLETED,
-              QUERY_COMPLETED_TRANSITION)
-          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
-              QueryEventType.DIAGNOSTIC_UPDATE,
-              DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
-              QueryEventType.KILL,
-              new KillAllStagesTransition())
-          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
-              QueryEventType.INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
-
-          // Transitions from QUERY_SUCCEEDED state
-          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
-              QueryEventType.DIAGNOSTIC_UPDATE,
-              DIAGNOSTIC_UPDATE_TRANSITION)
-          // ignore-able transitions
-          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
-              QueryEventType.STAGE_COMPLETED,
-              STAGE_COMPLETED_TRANSITION)
-          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
-              QueryEventType.KILL)
-          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
-              QueryEventType.INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
-
-          // Transitions from KILL_WAIT state
-          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
-              QueryEventType.STAGE_COMPLETED,
-              STAGE_COMPLETED_TRANSITION)
-          .addTransition(QueryState.QUERY_KILL_WAIT,
-              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
-                  QueryState.QUERY_ERROR),
-              QueryEventType.QUERY_COMPLETED,
-              QUERY_COMPLETED_TRANSITION)
-          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
-              QueryEventType.DIAGNOSTIC_UPDATE,
-              DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
-              QueryEventType.INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
-          // Ignore-able transitions
-          .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
-              QueryEventType.KILL,
-              QUERY_COMPLETED_TRANSITION)
-
-          // Transitions from FAILED state
-          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
-              QueryEventType.DIAGNOSTIC_UPDATE,
-              DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
-              QueryEventType.INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
-          // Ignore-able transitions
-          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
-              QueryEventType.KILL)
-
-          // Transitions from ERROR state
-          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
-              QueryEventType.DIAGNOSTIC_UPDATE,
-              DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
-              QueryEventType.INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
-          // Ignore-able transitions
-          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
-              EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED))
-
-          .installTopology();
-
-  public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
-               final long appSubmitTime,
-               final String queryStr,
-               final EventHandler eventHandler,
-               final MasterPlan plan) {
-    this.context = context;
-    this.systemConf = context.getConf();
-    this.id = id;
-    this.clock = context.getClock();
-    this.appSubmitTime = appSubmitTime;
-    this.queryStr = queryStr;
-    this.stages = Maps.newConcurrentMap();
-    this.eventHandler = eventHandler;
-    this.plan = plan;
-    this.cursor = new ExecutionBlockCursor(plan, true);
-
-    StringBuilder sb = new StringBuilder("\n=======================================================");
-    sb.append("\nThe order of execution: \n");
-    int order = 1;
-    while (cursor.hasNext()) {
-      ExecutionBlock currentEB = cursor.nextBlock();
-      sb.append("\n").append(order).append(": ").append(currentEB.getId());
-      order++;
-    }
-    sb.append("\n=======================================================");
-    LOG.info(sb);
-    cursor.reset();
-
-    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-    this.readLock = readWriteLock.readLock();
-    this.writeLock = readWriteLock.writeLock();
-
-    stateMachine = stateMachineFactory.make(this);
-    queryState = stateMachine.getCurrentState();
-  }
-
-  public float getProgress() {
-    QueryState state = getState();
-    if (state == QueryState.QUERY_SUCCEEDED) {
-      return 1.0f;
-    } else {
-      int idx = 0;
-      List<Stage> tempStages = new ArrayList<Stage>();
-      synchronized(stages) {
-        tempStages.addAll(stages.values());
-      }
-
-      float [] subProgresses = new float[tempStages.size()];
-      for (Stage stage: tempStages) {
-        if (stage.getState() != StageState.NEW) {
-          subProgresses[idx] = stage.getProgress();
-        } else {
-          subProgresses[idx] = 0.0f;
-        }
-        idx++;
-      }
-
-      float totalProgress = 0.0f;
-      float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
-
-      for (int i = 0; i < subProgresses.length; i++) {
-        totalProgress += subProgresses[i] * proportion;
-      }
-
-      return totalProgress;
-    }
-  }
-
-  public long getAppSubmitTime() {
-    return this.appSubmitTime;
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public void setStartTime() {
-    startTime = clock.getTime();
-  }
-
-  public long getFinishTime() {
-    return finishTime;
-  }
-
-  public void setFinishTime() {
-    finishTime = clock.getTime();
-  }
-
-  public QueryHistory getQueryHistory() {
-    QueryHistory queryHistory = makeQueryHistory();
-    queryHistory.setStageHistories(makeStageHistories());
-    return queryHistory;
-  }
-
-  private List<StageHistory> makeStageHistories() {
-    List<StageHistory> stageHistories = new ArrayList<StageHistory>();
-    for(Stage eachStage : getStages()) {
-      stageHistories.add(eachStage.getStageHistory());
-    }
-
-    return stageHistories;
-  }
-
-  private QueryHistory makeQueryHistory() {
-    QueryHistory queryHistory = new QueryHistory();
-
-    queryHistory.setQueryId(getId().toString());
-    queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
-    queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
-    queryHistory.setLogicalPlan(plan.toString());
-    queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
-    queryHistory.setDistributedPlan(plan.toString());
-
-    List<String[]> sessionVariables = new ArrayList<String[]>();
-    for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) {
-      if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) {
-        sessionVariables.add(new String[]{entry.getKey(), entry.getValue()});
-      }
-    }
-    queryHistory.setSessionVariables(sessionVariables);
-
-    return queryHistory;
-  }
-
-  public List<String> getDiagnostics() {
-    readLock.lock();
-    try {
-      return diagnostics;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  protected void addDiagnostic(String diag) {
-    diagnostics.add(diag);
-  }
-
-  public TableDesc getResultDesc() {
-    return resultDesc;
-  }
-
-  public void setResultDesc(TableDesc desc) {
-    resultDesc = desc;
-  }
-
-  public MasterPlan getPlan() {
-    return plan;
-  }
-
-  public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
-    return stateMachine;
-  }
-  
-  public void addStage(Stage stage) {
-    stages.put(stage.getId(), stage);
-  }
-  
-  public QueryId getId() {
-    return this.id;
-  }
-
-  public Stage getStage(ExecutionBlockId id) {
-    return this.stages.get(id);
-  }
-
-  public Collection<Stage> getStages() {
-    return this.stages.values();
-  }
-
-  public QueryState getSynchronizedState() {
-    readLock.lock();
-    try {
-      return stateMachine.getCurrentState();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  /* non-blocking call for client API */
-  public QueryState getState() {
-    return queryState;
-  }
-
-  public ExecutionBlockCursor getExecutionBlockCursor() {
-    return cursor;
-  }
-
-  public static class StartTransition
-      implements SingleArcTransition<Query, QueryEvent> {
-
-    @Override
-    public void transition(Query query, QueryEvent queryEvent) {
-
-      query.setStartTime();
-      Stage stage = new Stage(query.context, query.getPlan(),
-          query.getExecutionBlockCursor().nextBlock());
-      stage.setPriority(query.priority--);
-      query.addStage(stage);
-
-      stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
-      LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
-    }
-  }
-
-  public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
-
-    @Override
-    public QueryState transition(Query query, QueryEvent queryEvent) {
-      QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent;
-      QueryState finalState;
-
-      if (stageEvent.getState() == StageState.SUCCEEDED) {
-        finalState = finalizeQuery(query, stageEvent);
-      } else if (stageEvent.getState() == StageState.FAILED) {
-        finalState = QueryState.QUERY_FAILED;
-      } else if (stageEvent.getState() == StageState.KILLED) {
-        finalState = QueryState.QUERY_KILLED;
-      } else {
-        finalState = QueryState.QUERY_ERROR;
-      }
-      if (finalState != QueryState.QUERY_SUCCEEDED) {
-        Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
-        if (lastStage != null && lastStage.getTableMeta() != null) {
-          StoreType storeType = lastStage.getTableMeta().getStoreType();
-          if (storeType != null) {
-            LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
-            try {
-              StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
-            } catch (IOException e) {
-              LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
-            }
-          }
-        }
-      }
-      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
-      query.setFinishTime();
-
-      return finalState;
-    }
-
-    private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
-      Stage lastStage = query.getStage(event.getExecutionBlockId());
-      StoreType storeType = lastStage.getTableMeta().getStoreType();
-      try {
-        LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
-        CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
-        TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
-
-        Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
-            .commitOutputData(query.context.getQueryContext(),
-                lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
-
-        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
-        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
-      } catch (Exception e) {
-        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
-        return QueryState.QUERY_ERROR;
-      }
-
-      return QueryState.QUERY_SUCCEEDED;
-    }
-
-    private static interface QueryHook {
-      boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
-      void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
-                   ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
-    }
-
-    private class QueryHookExecutor {
-      private List<QueryHook> hookList = TUtil.newList();
-      private QueryMaster.QueryMasterContext context;
-
-      public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
-        this.context = context;
-        hookList.add(new MaterializedResultHook());
-        hookList.add(new CreateTableHook());
-        hookList.add(new InsertTableHook());
-      }
-
-      public void execute(QueryContext queryContext, Query query,
-                          ExecutionBlockId finalExecBlockId,
-                          Path finalOutputDir) throws Exception {
-        for (QueryHook hook : hookList) {
-          if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
-            hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
-          }
-        }
-      }
-    }
-
-    private class MaterializedResultHook implements QueryHook {
-
-      @Override
-      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
-                                Path finalOutputDir) {
-        Stage lastStage = query.getStage(finalExecBlockId);
-        NodeType type = lastStage.getBlock().getPlan().getType();
-        return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
-      }
-
-      @Override
-      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
-                          Query query, ExecutionBlockId finalExecBlockId,
-                          Path finalOutputDir) throws Exception {
-        Stage lastStage = query.getStage(finalExecBlockId);
-        TableMeta meta = lastStage.getTableMeta();
-
-        String nullChar = queryContext.get(SessionVars.NULL_CHAR);
-        meta.putOption(StorageConstants.TEXT_NULL, nullChar);
-
-        TableStats stats = lastStage.getResultStats();
-
-        TableDesc resultTableDesc =
-            new TableDesc(
-                query.getId().toString(),
-                lastStage.getSchema(),
-                meta,
-                finalOutputDir.toUri());
-        resultTableDesc.setExternal(true);
-
-        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
-        resultTableDesc.setStats(stats);
-        query.setResultDesc(resultTableDesc);
-      }
-    }
-
-    private class CreateTableHook implements QueryHook {
-
-      @Override
-      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
-                                Path finalOutputDir) {
-        Stage lastStage = query.getStage(finalExecBlockId);
-        return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
-      }
-
-      @Override
-      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
-                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
-        CatalogService catalog = context.getWorkerContext().getCatalog();
-        Stage lastStage = query.getStage(finalExecBlockId);
-        TableStats stats = lastStage.getResultStats();
-
-        CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
-        TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
-
-        TableDesc tableDescTobeCreated =
-            new TableDesc(
-                createTableNode.getTableName(),
-                createTableNode.getTableSchema(),
-                meta,
-                finalOutputDir.toUri());
-        tableDescTobeCreated.setExternal(createTableNode.isExternal());
-
-        if (createTableNode.hasPartition()) {
-          tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
-        }
-
-        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
-        tableDescTobeCreated.setStats(stats);
-        query.setResultDesc(tableDescTobeCreated);
-
-        catalog.createTable(tableDescTobeCreated);
-      }
-    }
-
-    private class InsertTableHook implements QueryHook {
-
-      @Override
-      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
-                                Path finalOutputDir) {
-        Stage lastStage = query.getStage(finalExecBlockId);
-        return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
-      }
-
-      @Override
-      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
-                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
-          throws Exception {
-
-        CatalogService catalog = context.getWorkerContext().getCatalog();
-        Stage lastStage = query.getStage(finalExecBlockId);
-        TableMeta meta = lastStage.getTableMeta();
-        TableStats stats = lastStage.getResultStats();
-
-        InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
-
-        TableDesc finalTable;
-        if (insertNode.hasTargetTable()) {
-          String tableName = insertNode.getTableName();
-          finalTable = catalog.getTableDesc(tableName);
-        } else {
-          String tableName = query.getId().toString();
-          finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri());
-        }
-
-        long volume = getTableVolume(query.systemConf, finalOutputDir);
-        stats.setNumBytes(volume);
-        finalTable.setStats(stats);
-
-        if (insertNode.hasTargetTable()) {
-          UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
-          builder.setTableName(finalTable.getName());
-          builder.setStats(stats.getProto());
-
-          catalog.updateTableStats(builder.build());
-        }
-
-        query.setResultDesc(finalTable);
-      }
-    }
-  }
-
-  public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(systemConf);
-    ContentSummary directorySummary = fs.getContentSummary(tablePath);
-    return directorySummary.getLength();
-  }
-
-  public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
-
-    private boolean hasNext(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.peek();
-      return !query.getPlan().isTerminal(nextBlock);
-    }
-
-    private void executeNextBlock(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.nextBlock();
-      Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
-      nextStage.setPriority(query.priority--);
-      query.addStage(nextStage);
-      nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
-
-      LOG.info("Scheduling Stage:" + nextStage.getId());
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
-        LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
-      }
-    }
-
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      try {
-        query.completedStagesCount++;
-        StageCompletedEvent castEvent = (StageCompletedEvent) event;
-
-        if (castEvent.getState() == StageState.SUCCEEDED) {
-          query.successedStagesCount++;
-        } else if (castEvent.getState() == StageState.KILLED) {
-          query.killedStagesCount++;
-        } else if (castEvent.getState() == StageState.FAILED) {
-          query.failedStagesCount++;
-        } else if (castEvent.getState() == StageState.ERROR) {
-          query.erroredStagesCount++;
-        } else {
-          LOG.error(String.format("Invalid Stage (%s) State %s at %s",
-              castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name()));
-          query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
-        }
-
-        // if a stage is succeeded and a query is running
-        if (castEvent.getState() == StageState.SUCCEEDED &&  // latest stage succeeded
-            query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
-            hasNext(query)) {                                   // there remains at least one stage.
-          query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
-          executeNextBlock(query);
-        } else { // if a query is completed due to finished, kill, failure, or error
-          query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
-        }
-      } catch (Throwable t) {
-        LOG.error(t.getMessage(), t);
-        query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
-      }
-    }
-  }
-
-  private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
-    }
-  }
-
-  private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      query.setFinishTime();
-      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
-    }
-  }
-
-  private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> {
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      synchronized (query.stages) {
-        for (Stage stage : query.stages.values()) {
-          query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
-        }
-      }
-    }
-  }
-
-  private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
-
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      query.setFinishTime();
-      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
-    }
-  }
-
-  @Override
-  public void handle(QueryEvent event) {
-    LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
-    try {
-      writeLock.lock();
-      QueryState oldState = getSynchronizedState();
-      try {
-        getStateMachine().doTransition(event.getType(), event);
-        queryState = getSynchronizedState();
-      } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state"
-            + ", type:" + event
-            + ", oldState:" + oldState.name()
-            + ", nextState:" + getSynchronizedState().name()
-            , e);
-        eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
-      }
-
-      //notify the eventhandler of state change
-      if (oldState != getSynchronizedState()) {
-        LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState());
-      }
-    }
-
-    finally {
-      writeLock.unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
deleted file mode 100644
index 0a87990..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-
-public class QueryInProgress extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
-
-  private QueryId queryId;
-
-  private Session session;
-
-  private AsyncDispatcher dispatcher;
-
-  private LogicalRootNode plan;
-
-  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private QueryInfo queryInfo;
-
-  private final TajoMaster.MasterContext masterContext;
-
-  private NettyClientBase queryMasterRpc;
-
-  private QueryMasterProtocolService queryMasterRpcClient;
-
-  public QueryInProgress(
-      TajoMaster.MasterContext masterContext,
-      Session session,
-      QueryContext queryContext,
-      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-    super(QueryInProgress.class.getName());
-    this.masterContext = masterContext;
-    this.session = session;
-    this.queryId = queryId;
-    this.plan = plan;
-
-    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
-    queryInfo.setStartTime(System.currentTimeMillis());
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    dispatcher = new AsyncDispatcher();
-    this.addService(dispatcher);
-
-    dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
-    super.init(conf);
-  }
-
-  public synchronized void kill() {
-    if(queryMasterRpcClient != null){
-      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
-    }
-  }
-
-  @Override
-  public void stop() {
-    if(stopped.getAndSet(true)) {
-      return;
-    }
-
-    LOG.info("=========================================================");
-    LOG.info("Stop query:" + queryId);
-
-    masterContext.getResourceManager().stopQueryMaster(queryId);
-
-    long startTime = System.currentTimeMillis();
-    while(true) {
-      try {
-        if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
-          LOG.info(queryId + " QueryMaster stopped");
-          break;
-        }
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        break;
-      }
-
-      try {
-        synchronized (this){
-          wait(100);
-        }
-      } catch (InterruptedException e) {
-        break;
-      }
-      if(System.currentTimeMillis() - startTime > 60 * 1000) {
-        LOG.warn("Failed to stop QueryMaster:" + queryId);
-        break;
-      }
-    }
-
-    if(queryMasterRpc != null) {
-      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
-    }
-
-    masterContext.getHistoryWriter().appendHistory(queryInfo);
-    super.stop();
-  }
-
-  @Override
-  public void start() {
-    super.start();
-  }
-
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-
-
-  public boolean startQueryMaster() {
-    try {
-      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
-      WorkerResourceManager resourceManager = masterContext.getResourceManager();
-      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
-
-      // if no resource to allocate a query master
-      if(resource == null) {
-        LOG.info("No Available Resources for QueryMaster");
-        return false;
-      }
-
-      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
-      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
-      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
-      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
-
-      getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
-
-      return true;
-    } catch (Exception e) {
-      catchException(e);
-      return false;
-    }
-  }
-
-  class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
-    @Override
-    public void handle(QueryJobEvent queryJobEvent) {
-      if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
-        heartbeat(queryJobEvent.getQueryInfo());
-      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
-        QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
-        queryInProgress.getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
-      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
-        submmitQueryToMaster();
-      } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        kill();
-      }
-    }
-  }
-
-  private void connectQueryMaster() throws Exception {
-    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
-    LOG.info("Connect to QueryMaster:" + addr);
-    queryMasterRpc =
-        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
-    queryMasterRpcClient = queryMasterRpc.getStub();
-  }
-
-  private synchronized void submmitQueryToMaster() {
-    if(querySubmitted.get()) {
-      return;
-    }
-
-    try {
-      if(queryMasterRpcClient == null) {
-        connectQueryMaster();
-      }
-      if(queryMasterRpcClient == null) {
-        LOG.info("No QueryMaster conneciton info.");
-        //TODO wait
-        return;
-      }
-      LOG.info("Call executeQuery to :" +
-          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
-
-      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
-      builder.setQueryId(queryId.getProto())
-          .setQueryContext(queryInfo.getQueryContext().getProto())
-          .setSession(session.getProto())
-          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
-          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
-
-      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
-      querySubmitted.set(true);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  public void catchException(Exception e) {
-    LOG.error(e.getMessage(), e);
-    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
-    queryInfo.setLastMessage(StringUtils.stringifyException(e));
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public QueryInfo getQueryInfo() {
-    return this.queryInfo;
-  }
-
-  public boolean isStarted() {
-    return !stopped.get() && this.querySubmitted.get();
-  }
-
-  private void heartbeat(QueryInfo queryInfo) {
-    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-
-    // to avoid partial update by different heartbeats
-    synchronized (this.queryInfo) {
-
-      // terminal state will let client to retrieve a query result
-      // So, we must set the query result before changing query state
-      if (isFinishState(queryInfo.getQueryState())) {
-        if (queryInfo.hasResultdesc()) {
-          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
-        }
-      }
-
-      this.queryInfo.setQueryState(queryInfo.getQueryState());
-      this.queryInfo.setProgress(queryInfo.getProgress());
-      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
-
-      // Update diagnosis message
-      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
-        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
-        LOG.info(queryId + queryInfo.getLastMessage());
-      }
-
-      // if any error occurs, print outs the error message
-      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
-        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
-      }
-
-
-      if (isFinishState(this.queryInfo.getQueryState())) {
-        masterContext.getQueryJobManager().getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
-      }
-    }
-  }
-
-  private boolean isFinishState(TajoProtos.QueryState state) {
-    return state == TajoProtos.QueryState.QUERY_FAILED ||
-        state == TajoProtos.QueryState.QUERY_KILLED ||
-        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
-  }
-}