You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/09 04:23:37 UTC

[2/3] TAJO-149: Eliminate QueryConf and its file write. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index d1e92be..c4b6ae0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -18,28 +18,29 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableDescImpl;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.master.ExecutionBlock;
 import org.apache.tajo.master.ExecutionBlockCursor;
+import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -54,14 +55,14 @@ public class Query implements EventHandler<QueryEvent> {
   private static final Log LOG = LogFactory.getLog(Query.class);
 
   // Facilities for Query
-  private final QueryConf conf;
+  private final TajoConf systemConf;
   private final Clock clock;
   private String queryStr;
   private Map<ExecutionBlockId, SubQuery> subqueries;
   private final EventHandler eventHandler;
   private final MasterPlan plan;
   private final StorageManager sm;
-  QueryMasterTask.QueryContext context;
+  QueryMasterTask.QueryMasterTaskContext context;
   private ExecutionBlockCursor cursor;
 
   // Query Status
@@ -108,13 +109,13 @@ public class Query implements EventHandler<QueryEvent> {
 
       .installTopology();
 
-  public Query(final QueryMasterTask.QueryContext context, final QueryId id,
+  public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
                final long appSubmitTime,
                final String queryStr,
                final EventHandler eventHandler,
                final MasterPlan plan) {
     this.context = context;
-    this.conf = context.getConf();
+    this.systemConf = context.getConf();
     this.id = id;
     this.clock = context.getClock();
     this.appSubmitTime = appSubmitTime;
@@ -308,31 +309,21 @@ public class Query implements EventHandler<QueryEvent> {
 
         } else { // Finish a query
           if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-            SubQuery subQuery = query.getSubQuery(castEvent.getExecutionBlockId());
-            TableDesc outputTableDesc = new TableDescImpl(query.context.getQueryMeta().getOutputTable(),
-                subQuery.getTableMeta(), query.context.getQueryMeta().getOutputPath());
-            query.setResultDesc(outputTableDesc);
-
-            if (!query.context.getQueryMeta().isFileOutput()) {
-              try {
-                query.writeStat(query.context.getQueryMeta().getOutputPath(), subQuery);
-              } catch (IOException e) {
-                e.printStackTrace();
+
+            Path finalOutputDir = commitOutputData(query);
+            TableDesc finalTableDesc = buildOrUpdateResultTableDesc(query, castEvent.getExecutionBlockId(), finalOutputDir);
+
+            QueryContext queryContext = query.context.getQueryContext();
+            CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
+
+            if (queryContext.hasOutputTable()) { // TRUE only if a query command is 'CREATE TABLE' OR 'INSERT INTO'
+              if (queryContext.isOutputOverwrite()) { // TRUE only if a query is 'INSERT OVERWRITE INTO'
+                catalog.deleteTable(finalOutputDir.getName());
               }
+              catalog.addTable(finalTableDesc);
             }
+            query.setResultDesc(finalTableDesc);
             query.eventHandler.handle(new QueryFinishEvent(query.getId()));
-
-            StoreTableNode storeTableNode = (StoreTableNode) PlannerUtil.findTopNode(subQuery.getBlock().getPlan(),
-                NodeType.STORE);
-            if (storeTableNode.isCreatedTable()) {
-              query.context.getQueryMasterContext().getWorkerContext().getCatalog().addTable(outputTableDesc);
-            } else if (storeTableNode.isOverwrite() && !query.context.getQueryMeta().isFileOutput()) {
-              CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
-              TableDesc updatingTable = catalog.getTableDesc(outputTableDesc.getName());
-              updatingTable.getMeta().setStat(outputTableDesc.getMeta().getStat());
-              catalog.deleteTable(outputTableDesc.getName());
-              catalog.addTable(updatingTable);
-            }
           }
 
           return query.finished(QueryState.QUERY_SUCCEEDED);
@@ -342,14 +333,58 @@ public class Query implements EventHandler<QueryEvent> {
         return QueryState.QUERY_FAILED;
       }
     }
-  }
 
-  private static class DiagnosticsUpdateTransition implements
-      SingleArcTransition<Query, QueryEvent> {
-    @Override
-    public void transition(Query query, QueryEvent event) {
-      query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event)
-          .getDiagnosticUpdate());
+    /**
+     * It moves a result data stored in a staging output dir into a final output dir.
+     */
+    public Path commitOutputData(Query query) {
+      QueryContext queryContext = query.context.getQueryContext();
+      Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+      Path finalOutputDir;
+      if (queryContext.hasOutputPath()) {
+        finalOutputDir = queryContext.getOutputPath();
+        try {
+          FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
+          fs.rename(stagingResultDir, finalOutputDir);
+          LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      } else {
+        finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+      }
+
+      return finalOutputDir;
+    }
+
+    /**
+     * It builds a table desc and update the table desc if necessary.
+     */
+    public TableDesc buildOrUpdateResultTableDesc(Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+      // Determine the output table name
+      SubQuery subQuery = query.getSubQuery(finalExecBlockId);
+      QueryContext queryContext = query.context.getQueryContext();
+      String outputTableName;
+      if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
+        outputTableName = queryContext.getOutputTable();
+      } else { // SELECT STATEMENT
+        outputTableName = query.getId().toString();
+      }
+
+      TableDesc outputTableDesc = new TableDescImpl(outputTableName, subQuery.getTableMeta(), finalOutputDir);
+      TableDesc finalTableDesc = outputTableDesc;
+
+      // If a query has a target table, a TableDesc is updated.
+      if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
+        if (queryContext.isOutputOverwrite()) {
+          CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
+          Preconditions.checkNotNull(catalog, "CatalogService is NULL");
+          TableDesc updatingTable = catalog.getTableDesc(outputTableDesc.getName());
+          updatingTable.getMeta().setStat(outputTableDesc.getMeta().getStat());
+          finalTableDesc = updatingTable;
+        }
+      }
+      return finalTableDesc;
     }
   }
 
@@ -415,9 +450,32 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  private void writeStat(Path outputPath, SubQuery subQuery)
-      throws IOException {
-    ExecutionBlock execBlock = subQuery.getBlock();
-    sm.writeTableMeta(outputPath, subQuery.getTableMeta());
+  public static interface QueryHook {
+    QueryState getTargetState();
+    void onEvent(Query query);
+  }
+
+  public static class QueryHookManager {
+    Map<QueryState, List<QueryHook>> hookList = TUtil.newHashMap();
+
+    public void addHook(QueryHook hook) {
+      if (hookList.containsKey(hook.getTargetState())) {
+        hookList.get(hook.getTargetState()).add(hook);
+      } else {
+        hookList.put(hook.getTargetState(), TUtil.newList(hook));
+      }
+    }
+
+    public void doHooks(Query query) {
+      QueryState finalState = query.checkQueryForCompleted();
+      List<QueryHook> list = hookList.get(finalState);
+      if (list != null) {
+        for (QueryHook hook : list) {
+          hook.onEvent(query);
+        }
+      } else {
+        LOG.error("QueryHookManager cannot deal with " + finalState + " event");
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index f6cf02e..809dce2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -29,7 +29,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
@@ -46,7 +46,7 @@ public class QueryInProgress extends CompositeService {
 
   private QueryId queryId;
 
-  private QueryMeta queryMeta;
+  private QueryContext queryContext;
 
   private TajoAsyncDispatcher dispatcher;
 
@@ -66,11 +66,11 @@ public class QueryInProgress extends CompositeService {
 
   public QueryInProgress(
       TajoMaster.MasterContext masterContext,
-      QueryMeta queryMeta,
+      QueryContext queryContext,
       QueryId queryId, String sql, LogicalRootNode plan) {
     super(QueryInProgress.class.getName());
     this.masterContext = masterContext;
-    this.queryMeta = queryMeta;
+    this.queryContext = queryContext;
     this.queryId = queryId;
     this.plan = plan;
 
@@ -105,7 +105,7 @@ public class QueryInProgress extends CompositeService {
     while(true) {
       try {
         if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
-          LOG.info("====> " + queryId + " QueryMaster stopped");
+          LOG.info(queryId + " QueryMaster stopped");
           queryMasterStopped = true;
           break;
         }
@@ -206,7 +206,7 @@ public class QueryInProgress extends CompositeService {
           null,
           TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
               .setQueryId(queryId.getProto())
-              .setQueryMeta(queryMeta.getProto())
+              .setQueryContext(queryContext.getProto())
               .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
               .build(), NullCallback.get());
       querySubmitted.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index fb92616..424d5bf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -28,7 +28,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
 
@@ -85,9 +85,9 @@ public class QueryJobManager extends CompositeService {
     return dispatcher.getEventHandler();
   }
 
-  public QueryInfo createNewQueryJob(QueryMeta queryMeta, String sql, LogicalRootNode plan) throws Exception {
+  public QueryInfo createNewQueryJob(QueryContext queryContext, String sql, LogicalRootNode plan) throws Exception {
     QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    QueryInProgress queryInProgress = new QueryInProgress(masterContext,queryMeta, queryId, sql, plan);
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryContext, queryId, sql, plan);
 
     synchronized(runningQueries) {
       runningQueries.put(queryId, queryInProgress);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 53b9c05..d45988c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -21,16 +21,15 @@ package org.apache.tajo.master.querymaster;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalOptimizer;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.GlobalPlanner;
@@ -62,7 +61,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private StorageManager storageManager;
 
-  private QueryConf queryConf;
+  private TajoConf systemConf;
 
   private Map<QueryId, QueryMasterTask> queryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
 
@@ -84,20 +83,19 @@ public class QueryMaster extends CompositeService implements EventHandler {
   public void init(Configuration conf) {
     LOG.info("QueryMaster init");
     try {
-      queryConf = new QueryConf(conf);
-      queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
+      systemConf = (TajoConf)conf;
 
       QUERY_SESSION_TIMEOUT = 60 * 1000;
-      queryMasterContext = new QueryMasterContext(queryConf);
+      queryMasterContext = new QueryMasterContext(systemConf);
 
       clock = new SystemClock();
 
       this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
       addIfService(dispatcher);
 
-      this.storageManager = new StorageManager(queryConf);
+      this.storageManager = new StorageManager(systemConf);
 
-      globalPlanner = new GlobalPlanner(queryConf, storageManager, dispatcher.getEventHandler());
+      globalPlanner = new GlobalPlanner(systemConf, storageManager, dispatcher.getEventHandler());
       globalOptimizer = new GlobalOptimizer();
 
       dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
@@ -188,13 +186,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
   }
 
   public class QueryMasterContext {
-    private QueryConf conf;
+    private TajoConf conf;
 
-    public QueryMasterContext(QueryConf conf) {
+    public QueryMasterContext(TajoConf conf) {
       this.conf = conf;
     }
 
-    public QueryConf getConf() {
+    public TajoConf getConf() {
       return conf;
     }
 
@@ -255,9 +253,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
       LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
       //To change body of implemented methods use File | Settings | File Templates.
       QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getQueryMeta(), event.getLogicalPlanJson());
+          event.getQueryId(), event.getQueryContext(), event.getLogicalPlanJson());
 
-      queryMasterTask.init(queryConf);
+      queryMasterTask.init(systemConf);
       queryMasterTask.start();
       synchronized(queryMasterTasks) {
         queryMasterTasks.put(event.getQueryId(), queryMasterTask);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
index 5a79464..ae1c1e5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -25,8 +25,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.PrintWriter;
@@ -34,9 +35,10 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 
+@Deprecated
 public class QueryMasterRunner extends AbstractService {
   private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
-  private QueryConf queryConf;
+  private TajoConf systemConf;
   private QueryMaster queryMaster;
   private QueryId queryId;
   private String queryMasterManagerAddress;
@@ -59,11 +61,9 @@ public class QueryMasterRunner extends AbstractService {
 
   @Override
   public void init(Configuration conf) {
-    this.queryConf = (QueryConf)conf;
-    RackResolver.init(queryConf);
-
+    this.systemConf = (TajoConf)conf;
+    RackResolver.init(systemConf);
     Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
-
     super.init(conf);
   }
 
@@ -72,7 +72,7 @@ public class QueryMasterRunner extends AbstractService {
     //create QueryMaster
     QueryMaster query = new QueryMaster(null);
 
-    query.init(queryConf);
+    query.init(systemConf);
     query.start();
   }
 
@@ -83,8 +83,8 @@ public class QueryMasterRunner extends AbstractService {
   public static void main(String[] args) throws Exception {
     LOG.info("QueryMasterRunner started");
 
-    final QueryConf conf = new QueryConf();
-    conf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
+    final TajoConf conf = new TajoConf();
+    conf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
 
     UserGroupInformation.setConfiguration(conf);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index f348e44..2212f82 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -39,13 +39,12 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.worker.AbstractResourceAllocator;
 import org.apache.tajo.worker.TajoResourceAllocator;
 import org.apache.tajo.worker.YarnResourceAllocator;
@@ -60,15 +59,15 @@ public class QueryMasterTask extends CompositeService {
   private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
 
   // query submission directory is private!
-  final public static FsPermission USER_DIR_PERMISSION =
+  final public static FsPermission STAGING_DIR_PERMISSION =
       FsPermission.createImmutable((short) 0700); // rwx--------
 
   private QueryId queryId;
 
-  private QueryMeta queryMeta;
-
   private QueryContext queryContext;
 
+  private QueryMasterTaskContext queryTaskContext;
+
   private QueryMaster.QueryMasterContext queryMasterContext;
 
   private Query query;
@@ -81,11 +80,9 @@ public class QueryMasterTask extends CompositeService {
 
   private final long querySubmitTime;
 
-  private Path outputPath;
-
   private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
 
-  private QueryConf queryConf;
+  private TajoConf systemConf;
 
   private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
 
@@ -94,28 +91,28 @@ public class QueryMasterTask extends CompositeService {
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, QueryMeta queryMeta, String logicalPlanJson) {
+                         QueryId queryId, QueryContext queryContext, String logicalPlanJson) {
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
     this.queryId = queryId;
-    this.queryMeta = queryMeta;
+    this.queryContext = queryContext;
     this.logicalPlanJson = logicalPlanJson;
     this.querySubmitTime = System.currentTimeMillis();
   }
 
   @Override
   public void init(Configuration conf) {
-    queryConf = new QueryConf(conf);
-    queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
+    systemConf = (TajoConf)conf;
+
     try {
-      queryContext = new QueryContext();
+      queryTaskContext = new QueryMasterTaskContext();
       String resourceManagerClassName = conf.get("tajo.resource.manager",
           TajoWorkerResourceManager.class.getCanonicalName());
 
       if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
-        resourceAllocator = new TajoResourceAllocator(queryContext);
+        resourceAllocator = new TajoResourceAllocator(queryTaskContext);
       } else {
-        resourceAllocator = new YarnResourceAllocator(queryContext);
+        resourceAllocator = new YarnResourceAllocator(queryTaskContext);
       }
       addService(resourceAllocator);
 
@@ -130,7 +127,7 @@ public class QueryMasterTask extends CompositeService {
 
       initStagingDir();
 
-      super.init(queryConf);
+      super.init(systemConf);
     } catch (IOException e) {
       LOG.error(e.getMessage(), e);
     }
@@ -242,14 +239,14 @@ public class QueryMasterTask extends CompositeService {
       MasterPlan globalPlan = queryMasterContext.getGlobalPlanner().build(queryId, logicalNodeRoot);
       this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(globalPlan);
 
-      query = new Query(queryContext, queryId, querySubmitTime,
-          "", queryContext.getEventHandler(), masterPlan);
+      query = new Query(queryTaskContext, queryId, querySubmitTime,
+          "", queryTaskContext.getEventHandler(), masterPlan);
 
       dispatcher.register(QueryEventType.class, query);
 
-      queryContext.getEventHandler().handle(new QueryEvent(queryId,
+      queryTaskContext.getEventHandler().handle(new QueryEvent(queryId,
           QueryEventType.INIT));
-      queryContext.getEventHandler().handle(new QueryEvent(queryId,
+      queryTaskContext.getEventHandler().handle(new QueryEvent(queryId,
           QueryEventType.START));
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -264,88 +261,85 @@ public class QueryMasterTask extends CompositeService {
    * them to variables.
    */
   private void initStagingDir() throws IOException {
+
     String realUser;
     String currentUser;
     UserGroupInformation ugi;
     ugi = UserGroupInformation.getLoginUser();
     realUser = ugi.getShortUserName();
     currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+    FileSystem defaultFS = FileSystem.get(systemConf);
 
-    String givenOutputTableName = queryMeta.getOutputTable();
-    Path stagingDir;
+    Path stagingDir = null;
+    Path outputDir = null;
+    try {
+      ////////////////////////////////////////////
+      // Create Output Directory
+      ////////////////////////////////////////////
 
-    // If final output directory is not given by an user,
-    // we use the query id as a output directory.
-    if (givenOutputTableName == null || givenOutputTableName.isEmpty()) {
-      FileSystem defaultFS = FileSystem.get(queryConf);
+      stagingDir = new Path(TajoConf.getStagingRoot(systemConf), queryId.toString());
 
-      Path homeDirectory = defaultFS.getHomeDirectory();
-      if (!defaultFS.exists(homeDirectory)) {
-        defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
+      if (defaultFS.exists(stagingDir)) {
+        throw new IOException("The staging directory '" + stagingDir + "' already exists");
       }
-
-      Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
-
-      if (defaultFS.exists(userQueryDir)) {
-        FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
-        String owner = fsStatus.getOwner();
-
-        if (!(owner.equals(currentUser) || owner.equals(realUser))) {
-          throw new IOException("The ownership on the user's query " +
-              "directory " + userQueryDir + " is not as expected. " +
-              "It is owned by " + owner + ". The directory must " +
-              "be owned by the submitter " + currentUser + " or " +
-              "by " + realUser);
-        }
-
-        if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
-          LOG.info("Permissions on staging directory " + userQueryDir + " are " +
-              "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
-              "to correct value " + USER_DIR_PERMISSION);
-          defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
-        }
-      } else {
-        defaultFS.mkdirs(userQueryDir,
-            new FsPermission(USER_DIR_PERMISSION));
+      defaultFS.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+      FileStatus fsStatus = defaultFS.getFileStatus(stagingDir);
+      String owner = fsStatus.getOwner();
+
+      if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+        throw new IOException("The ownership on the user's query " +
+            "directory " + stagingDir + " is not as expected. " +
+            "It is owned by " + owner + ". The directory must " +
+            "be owned by the submitter " + currentUser + " or " +
+            "by " + realUser);
       }
 
-      stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
-
-      if (defaultFS.exists(stagingDir)) {
-        throw new IOException("The staging directory " + stagingDir
-            + "already exists. The directory must be unique to each query");
-      } else {
-        defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+      if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+        LOG.info("Permissions on staging directory " + stagingDir + " are " +
+            "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+            "to correct value " + STAGING_DIR_PERMISSION);
+        defaultFS.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
       }
 
-      // Set the query id to the output table name
-      queryMeta.setOutputTable(queryId.toString());
-
-    } else { // if a output table is given
-
-      Path warehouseDir = new Path(queryConf.getVar(TajoConf.ConfVars.ROOT_DIR),
-          TajoConstants.WAREHOUSE_DIR);
-      FileSystem fs = warehouseDir.getFileSystem(queryConf);
-
-      if (queryMeta.isFileOutput()) {
-        stagingDir = queryMeta.getOutputPath();
-      } else {
-        stagingDir = new Path(warehouseDir, queryMeta.getOutputTable());
+      // Create a subdirectories
+      defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
+      LOG.info("The staging dir '" + outputDir + "' is created.");
+      queryContext.setStagingDir(stagingDir);
+
+      ////////////////////////////////////////////////////
+      // Check and Create An Output Directory If Necessary
+      ////////////////////////////////////////////////////
+      if (queryContext.hasOutputPath()) {
+        outputDir = queryContext.getOutputPath();
+        if (queryContext.isOutputOverwrite()) {
+          if (defaultFS.exists(outputDir.getParent())) {
+            if (defaultFS.exists(outputDir)) {
+              defaultFS.delete(outputDir, true);
+              LOG.info("The output directory '" + outputDir + "' is cleaned.");
+            }
+          } else {
+            defaultFS.mkdirs(outputDir.getParent());
+            LOG.info("The output directory's parent '" + outputDir.getParent() + "' is created.");
+          }
+        } else {
+          if (defaultFS.exists(outputDir)) {
+            throw new IOException("The output directory '" + outputDir + " already exists.");
+          }
+        }
+      }
+    } catch (IOException ioe) {
+      if (stagingDir != null && defaultFS.exists(stagingDir)) {
+        defaultFS.delete(stagingDir, true);
+        LOG.info("The staging directory '" + stagingDir + "' is deleted");
       }
 
-      if (!queryMeta.isOutputOverwrite()) {
-        if (fs.exists(stagingDir)) {
-          throw new IOException("The staging directory " + stagingDir
-              + " already exists. The directory must be unique to each query");
-        }
+      if (outputDir != null && defaultFS.exists(outputDir)) {
+        defaultFS.delete(outputDir, true);
+        LOG.info("The output directory '" + outputDir + "' is deleted");
       }
 
-      fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+      throw ioe;
     }
-
-    queryMeta.setOutputPath(stagingDir);
-    outputPath = stagingDir;
-    LOG.info("Initialized Query Staging Dir: " + outputPath);
   }
 
   public Query getQuery() {
@@ -356,12 +350,12 @@ public class QueryMasterTask extends CompositeService {
     stop();
   }
 
-  public QueryContext getQueryContext() {
-    return queryContext;
+  public QueryMasterTaskContext getQueryTaskContext() {
+    return queryTaskContext;
   }
 
   public EventHandler getEventHandler() {
-    return queryContext.getEventHandler();
+    return queryTaskContext.getEventHandler();
   }
 
   public void touchSessionTime() {
@@ -384,18 +378,18 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
-  public class QueryContext {
+  public class QueryMasterTaskContext {
     EventHandler eventHandler;
     public QueryMaster.QueryMasterContext getQueryMasterContext() {
       return queryMasterContext;
     }
 
-    public QueryMeta getQueryMeta() {
-      return queryMeta;
+    public QueryContext getQueryContext() {
+      return queryContext;
     }
 
-    public QueryConf getConf() {
-      return queryConf;
+    public TajoConf getConf() {
+      return systemConf;
     }
 
     public Clock getClock() {
@@ -414,8 +408,8 @@ public class QueryMasterTask extends CompositeService {
       return queryMasterContext.getStorageManager();
     }
 
-    public Path getOutputPath() {
-      return outputPath;
+    public Path getStagingDir() {
+      return queryContext.getStagingDir();
     }
 
     public synchronized EventHandler getEventHandler() {
@@ -445,7 +439,7 @@ public class QueryMasterTask extends CompositeService {
     }
 
     public AbstractResourceAllocator getResourceAllocator() {
-      return (AbstractResourceAllocator)resourceAllocator;
+      return resourceAllocator;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 0cd5d64..43895d8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -69,14 +69,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private static final Log LOG = LogFactory.getLog(SubQuery.class);
 
-  private QueryMeta queryMeta;
+  private QueryContext queryContext;
   private ExecutionBlock block;
   private int priority;
   private TableMeta meta;
   private EventHandler eventHandler;
   private final StorageManager sm;
   private TaskSchedulerImpl taskScheduler;
-  private QueryMasterTask.QueryContext context;
+  private QueryMasterTask.QueryMasterTaskContext context;
 
   private long startTime;
   private long finishTime;
@@ -136,7 +136,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private int completedTaskCount = 0;
 
-  public SubQuery(QueryMasterTask.QueryContext context, ExecutionBlock block, StorageManager sm) {
+  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock block, StorageManager sm) {
     this.context = context;
     this.block = block;
     this.sm = sm;
@@ -153,7 +153,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING;
   }
 
-  public QueryMasterTask.QueryContext getContext() {
+  public QueryMasterTask.QueryMasterTaskContext getContext() {
     return context;
   }
 
@@ -335,9 +335,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     } else {
       stat = computeStatFromTasks();
     }
-    TableMeta meta = writeStat(this, stat);
+
+    StoreTableNode storeTableNode = getBlock().getStoreTableNode();
+    TableMeta meta = toTableMeta(storeTableNode);
     meta.setStat(stat);
-    setTableMeta(meta);
     return meta;
   }
 
@@ -356,7 +357,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     StoreTableNode storeTableNode = execBlock.getStoreTableNode();
     TableMeta meta = toTableMeta(storeTableNode);
     meta.setStat(stat);
-    sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
+    //sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
     return meta;
   }
 
@@ -597,7 +598,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       return maxTaskNum;
     }
 
-    public static long getInputVolume(QueryMasterTask.QueryContext context, ExecutionBlock execBlock) {
+    public static long getInputVolume(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock execBlock) {
       Map<String, TableDesc> tableMap = context.getTableDescMap();
       if (execBlock.isLeafBlock()) {
         ScanNode outerScan = execBlock.getScanNodes()[0];

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 3701d58..eb710fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -211,15 +211,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
           LOG.info("====> allocateWorkerResources: allocated:" + workerResources.size());
 
-//          if(LOG.isDebugEnabled()) {
-//            LOG.debug("====> allocateWorkerResources:" +
-//                (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
-//                ", required:" + resourceRequest.request.getNumWorks() + ", allocated:" + workerResources.size());
-//          } else {
-//            LOG.info("====> allocateWorkerResources: required:" + resourceRequest.request.getNumWorks() +
-//                ", allocated:" + workerResources.size() + ", queryMasterRequest=" + resourceRequest.queryMasterRequest);
-//          }
-
           if(workerResources.size() > 0) {
             if(resourceRequest.queryMasterRequest) {
               startQueryMaster(resourceRequest.queryId, workerResources.get(0));
@@ -386,7 +377,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
         allWorkerResourceMap.put(workerResource.getId(), workerResource);
         liveWorkerResources.add(hostAndPort);
 
-        LOG.info("====> TajoWorker:" + workerResource + " added in live TajoWorker list");
+        LOG.info("TajoWorker:" + workerResource + " added in live TajoWorker list");
 
         workerResourceLock.notifyAll();
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
index 4392158..2202183 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
@@ -57,10 +57,10 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
   private static final Log LOG = LogFactory.getLog(YarnRMContainerAllocator.
       class.getName());
 
-  private QueryMasterTask.QueryContext context;
+  private QueryMasterTask.QueryMasterTaskContext context;
   private final EventHandler eventHandler;
 
-  public YarnRMContainerAllocator(QueryMasterTask.QueryContext context) {
+  public YarnRMContainerAllocator(QueryMasterTask.QueryMasterTaskContext context) {
     super(ApplicationIdUtils.createApplicationAttemptId(context.getQueryId()));
     this.context = context;
     this.eventHandler = context.getDispatcher().getEventHandler();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index fe31bc6..da7daf0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -246,7 +246,6 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
     //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
     //}
     // Set class name
-    //vargs.add(QueryMasterRunner.class.getCanonicalName());
     vargs.add(TajoWorker.class.getCanonicalName());
     vargs.add("qm");
     vargs.add(queryId.toString()); // queryId

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index c1afb80..fa2ff14 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryConf;
-import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.ContainerProxy;
@@ -68,15 +66,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
   static AtomicInteger containerIdSeq = new AtomicInteger(0);
   private TajoConf tajoConf;
-  private QueryMasterTask.QueryContext queryContext;
+  private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
   private final ExecutorService executorService;
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
-  public TajoResourceAllocator(QueryMasterTask.QueryContext queryContext) {
-    this.queryContext = queryContext;
+  public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+    this.queryTaskContext = queryTaskContext;
     executorService = Executors.newFixedThreadPool(
-        queryContext.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+        queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
   }
 
   @Override
@@ -102,9 +100,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
   public void init(Configuration conf) {
     tajoConf = (TajoConf)conf;
 
-    queryContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
+    queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
 //
-    queryContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
+    queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
 
     super.init(conf);
   }
@@ -117,7 +115,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     stopped.set(true);
     executorService.shutdownNow();
 
-    Map<ContainerId, ContainerProxy> containers = queryContext.getResourceAllocator().getContainers();
+    Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
     List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
     for(ContainerProxy eachProxy: list) {
       try {
@@ -162,36 +160,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
   }
 
   private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
-    FileSystem fs = null;
-
-    QueryConf queryConf = queryContext.getConf();
-    LOG.info("defaultFS: " + queryConf.get("fs.default.name"));
-    LOG.info("defaultFS: " + queryConf.get("fs.defaultFS"));
-    try {
-      fs = FileSystem.get(queryConf);
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-    try {
-      // TODO move to tajo temp
-      Path warehousePath = new Path(queryConf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
-      Path queryConfPath = new Path(warehousePath, executionBlockId.getQueryId().toString());
-      queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
-
-      if(!fs.exists(queryConfPath)){
-        LOG.info("Writing a QueryConf to HDFS and add to local environment, outputPath=" + queryConfPath);
-        writeConf(queryConf, queryConfPath);
-      } else {
-        LOG.warn("QueryConf already exist. path: "  + queryConfPath.toString());
-      }
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-    //Query in standby mode doesn't need launch Worker.
-    //But, Assign ExecutionBlock to assigned tajo worker
+    // Query in standby mode doesn't need launch Worker.
+    // But, Assign ExecutionBlock to assigned tajo worker
     for(Container eachContainer: containers) {
-      TajoContainerProxy containerProxy = new TajoContainerProxy(queryContext, tajoConf,
+      TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
           eachContainer, executionBlockId);
       executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
     }
@@ -213,7 +185,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
   private void stopContainers(Collection<Container> containers) {
     for (Container container : containers) {
-      final ContainerProxy proxy = queryContext.getResourceAllocator().getContainer(container.getId());
+      final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId());
       executorService.submit(new StopContainerRunner(container.getId(), proxy));
     }
   }
@@ -248,7 +220,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
     @Override
     public void run() {
-      LOG.info("======> Start TajoWorkerAllocationThread");
+      LOG.info("Start TajoWorkerAllocationThread");
       CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
           new CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse>();
 
@@ -262,7 +234,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               .setExecutionBlockId(event.getExecutionBlockId().getProto())
               .build();
 
-      queryContext.getQueryMasterContext().getWorkerContext().
+      queryTaskContext.getQueryMasterContext().getWorkerContext().
           getTajoMasterRpcClient().allocateWorkerResources(null, request, callBack);
 
       TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
@@ -315,14 +287,14 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           containers.add(container);
         }
 
-        SubQueryState state = queryContext.getSubQuery(executionBlockId).getState();
+        SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
         if (!SubQuery.isRunningState(state)) {
           List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
           for(Container eachContainer: containers) {
             workerResources.add(((TajoWorkerContainer)eachContainer).getWorkerResource());
           }
           try {
-            TajoContainerProxy.releaseWorkerResource(queryContext, executionBlockId, workerResources);
+            TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, workerResources);
           } catch (Exception e) {
             LOG.error(e.getMessage(), e);
           }
@@ -333,7 +305,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           if(LOG.isDebugEnabled()) {
             LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
           }
-          queryContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
+          queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
         }
         numAllocatedWorkers += workerHosts.size();
 
@@ -345,7 +317,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
             event.getRequiredNum() - numAllocatedWorkers,
             event.isLeafQuery(), event.getProgress()
         );
-        queryContext.getEventHandler().handle(shortRequestEvent);
+        queryTaskContext.getEventHandler().handle(shortRequestEvent);
 
       }
       LOG.info("Stop TajoWorkerAllocationThread");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 0fc896b..ff25fb3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -21,13 +21,16 @@ package org.apache.tajo.worker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.CatalogClient;
+import org.apache.tajo.catalog.CatalogConstants;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
@@ -148,9 +151,9 @@ public class TajoWorker extends CompositeService {
 
       tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, managerPort);
       addService(tajoWorkerManagerService);
-      LOG.info("====> Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort=" + managerPort);
+      LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort=" + managerPort);
     } else {
-      LOG.info("====> Tajo worker started: mode=" + daemonMode);
+      LOG.info("Tajo worker started: mode=" + daemonMode);
     }
 
     super.init(conf);
@@ -252,8 +255,7 @@ public class TajoWorker extends CompositeService {
   }
 
   private void setWorkerMode(String[] params) {
-    if("qm".equals(daemonMode)) {
-      //QueryMaster mode
+    if("qm".equals(daemonMode)) { //QueryMaster mode
 
       String tajoMasterAddress = params[2];
       connectToTajoMaster(tajoMasterAddress);
@@ -262,11 +264,9 @@ public class TajoWorker extends CompositeService {
       QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
       tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
           queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
-    } else if("tr".equals(daemonMode)) {
-      //TaskRunner mode
+    } else if("tr".equals(daemonMode)) { //TaskRunner mode
       taskRunnerManager.startTask(params);
-    } else {
-      //Standby mode
+    } else { //Standby mode
       connectToTajoMaster(tajoConf.get("tajo.master.manager.addr"));
       connectToCatalog();
       workerHeartbeatThread = new WorkerHeartbeatThread();
@@ -297,10 +297,10 @@ public class TajoWorker extends CompositeService {
 
   private void connectToCatalog() {
     // TODO: To be improved. it's a hack. It assumes that CatalogServer is embedded in TajoMaster.
-    String hostName = this.tajoMasterAddress.getHostName();
-    int port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS).split(":")[1]);
+    String catalogAddr = tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS);
+    //int port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS).split(":")[1]);
     try {
-      catalogClient = new CatalogClient(hostName, port);
+      catalogClient = new CatalogClient(tajoConf);
     } catch (IOException e) {
       e.printStackTrace();
     }
@@ -489,9 +489,12 @@ public class TajoWorker extends CompositeService {
 
     String workerMode = args[0];
 
+    TajoConf tajoConf = new TajoConf();
+    tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
     try {
       TajoWorker tajoWorker = new TajoWorker(workerMode);
-      tajoWorker.startWorker(new TajoConf(new YarnConfiguration()), args);
+      tajoWorker.startWorker(tajoConf, args);
     } catch (Throwable t) {
       LOG.fatal("Error starting TajoWorker", t);
       System.exit(-1);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index e1fd88a..a77ad2a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -175,8 +175,8 @@ public class TajoWorkerClientService extends AbstractService {
           builder.setSubmitTime(query.getAppSubmitTime());
           builder.setInitTime(query.getInitializationTime());
           builder.setHasResult(
-              !(queryMasterTask.getQueryContext().getQueryMeta().isCreateTable() ||
-                  queryMasterTask.getQueryContext().getQueryMeta().isInsert())
+              !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
+                  queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
           );
           if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
             builder.setFinishTime(query.getFinishTime());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 6fde6e4..28cc5f6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -31,7 +31,7 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.master.TaskSchedulerImpl;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.querymaster.QueryMaster;
@@ -125,13 +125,13 @@ public class TajoWorkerManagerService extends CompositeService
       ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
       QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
       ContainerId cid =
-          queryMasterTask.getQueryContext().getResourceAllocator().makeContainerId(request.getContainerId());
+          queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
 
       if(queryMasterTask == null || queryMasterTask.isStopped()) {
-        LOG.info("====>getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
+        LOG.info("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
         done.run(TaskSchedulerImpl.stopTaskRunnerReq);
       } else {
-        LOG.info("====>getTask:" + cid + ", ebId:" + ebId);
+        LOG.info("getTask:" + cid + ", ebId:" + ebId);
         queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
       }
     } catch (Exception e) {
@@ -202,7 +202,7 @@ public class TajoWorkerManagerService extends CompositeService
       QueryId queryId = new QueryId(request.getQueryId());
       LOG.info("Receive executeQuery request:" + queryId);
       queryMaster.handle(new QueryStartEvent(queryId,
-          new QueryMeta(request.getQueryMeta()), request.getLogicalPlanJson().getValue()));
+          new QueryContext(request.getQueryContext()), request.getLogicalPlanJson().getValue()));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index aef5ead..e66751c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -30,13 +30,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.exception.UnfinishedTaskException;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
@@ -48,7 +49,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
 import org.apache.tajo.master.ExecutionBlock.PartitionType;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.StorageUtil;
@@ -69,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class Task {
   private static final Log LOG = LogFactory.getLog(Task.class);
 
-  private final QueryConf conf;
-  private final QueryMeta queryMeta;
+  private final TajoConf systemConf;
+  private final QueryContext queryContext;
   private final FileSystem localFS;
   private final TaskRunner.TaskRunnerContext taskRunnerContext;
   private final Interface masterProxy;
@@ -137,8 +138,8 @@ public class Task {
     this.reporter.startCommunicationThread();
 
     this.taskId = request.getId();
-    this.conf = worker.getQueryConf();
-    this.queryMeta = request.getQueryMeta();
+    this.systemConf = worker.getConf();
+    this.queryContext = request.getQueryContext();
     this.taskRunnerContext = worker;
     this.masterProxy = masterProxy;
     this.localFS = worker.getLocalFS();
@@ -146,7 +147,7 @@ public class Task {
     this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
         taskId.getQueryUnitId().getId() + "_" + taskId.getId());
 
-    this.context = new TaskAttemptContext(conf, taskId,
+    this.context = new TaskAttemptContext(systemConf, taskId,
         request.getFragments().toArray(new Fragment[request.getFragments().size()]),
         taskDir);
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
@@ -163,7 +164,7 @@ public class Task {
     } else {
       // The final result of a task will be written in a file named part-ss-nnnnnnn,
       // where ss is the subquery id associated with this task, and nnnnnn is the task id.
-      Path outFilePath = new Path(queryMeta.getOutputPath(),
+      Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME,
           OUTPUT_FILE_PREFIX +
           OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
           OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
@@ -201,7 +202,7 @@ public class Task {
     if (request.getFetches().size() > 0) {
       inputTableBaseDir = localFS.makeQualified(
           lDirAllocator.getLocalPathForWrite(
-              getTaskAttemptDir(context.getTaskId()).toString() + "/in", conf));
+              getTaskAttemptDir(context.getTaskId()).toString() + "/in", systemConf));
       localFS.mkdirs(inputTableBaseDir);
       Path tableDir;
       for (String inputTable : context.getInputTables()) {
@@ -451,7 +452,7 @@ public class Task {
 
   private Fragment[] localizeFetchedData(File file, String name, TableMeta meta)
       throws IOException {
-    Configuration c = new Configuration(conf);
+    Configuration c = new Configuration(systemConf);
     c.set("fs.default.name", "file:///");
     FileSystem fs = FileSystem.get(c);
     Path tablePath = new Path(file.getAbsolutePath());
@@ -525,7 +526,7 @@ public class Task {
     if (fetches.size() > 0) {
       Path inputDir = lDirAllocator.
           getLocalPathToRead(
-              getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", conf);
+              getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", systemConf);
       File storeDir;
 
       int i = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 6128bb3..3056595 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -59,7 +59,7 @@ public class TaskRunner extends AbstractService {
   /** class logger */
   private static final Log LOG = LogFactory.getLog(TaskRunner.class);
 
-  private QueryConf queryConf;
+  private TajoConf systemConf;
 
   private volatile boolean stopped = false;
 
@@ -105,7 +105,7 @@ public class TaskRunner extends AbstractService {
 
   private TaskRunnerManager taskRunnerManager;
 
-  public TaskRunner(TaskRunnerManager taskRunnerManager, QueryConf conf, String[] args) {
+  public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
     super(TaskRunner.class.getName());
 
     this.taskRunnerManager = taskRunnerManager;
@@ -130,8 +130,7 @@ public class TaskRunner extends AbstractService {
       LOG.info("QueryMaster Address:" + masterAddr);
       // TODO - 'load credential' should be implemented
       // Getting taskOwner
-      UserGroupInformation taskOwner =
-          UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
+      UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.TAJO_USERNAME));
       //taskOwner.addToken(token);
 
       // initialize MasterWorkerProtocol as an actual task owner.
@@ -161,16 +160,15 @@ public class TaskRunner extends AbstractService {
 
   @Override
   public void init(Configuration conf) {
-    this.queryConf = (QueryConf)conf;
+    this.systemConf = (TajoConf)conf;
 
     try {
       // initialize DFS and LocalFileSystems
-      defaultFS = FileSystem.get(URI.create(queryConf.getVar(ConfVars.ROOT_DIR)),conf);
+      defaultFS = TajoConf.getTajoRootPath(systemConf).getFileSystem(conf);
       localFS = FileSystem.getLocal(conf);
 
       // the base dir for an output dir
-      baseDir = queryId.toString()
-          + "/output" + "/" + executionBlockId.getId();
+      baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
 
       // initialize LocalDirAllocator
       lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
@@ -180,7 +178,7 @@ public class TaskRunner extends AbstractService {
 
       // Setup QueryEngine according to the query plan
       // Here, we can setup row-based query engine or columnar query engine.
-      this.queryEngine = new TajoQueryEngine(queryConf);
+      this.queryEngine = new TajoQueryEngine(systemConf);
     } catch (Throwable t) {
       LOG.error(t);
     }
@@ -222,8 +220,8 @@ public class TaskRunner extends AbstractService {
   }
 
   public class TaskRunnerContext {
-    public QueryConf getQueryConf() {
-      return queryConf;
+    public TajoConf getConf() {
+      return systemConf;
     }
 
     public String getNodeId() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 5d8fa8d..f1ca567 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -22,7 +22,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.conf.TajoConf;
 
 import java.util.HashMap;
@@ -88,13 +87,13 @@ public class TaskRunnerManager extends CompositeService {
     Thread t = new Thread() {
       public void run() {
         try {
-          QueryConf queryConf = new QueryConf(tajoConf);
-          TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, queryConf, params);
+          TajoConf systemConf = new TajoConf(tajoConf);
+          TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, systemConf, params);
           LOG.info("Start TaskRunner:" + taskRunner.getId());
           synchronized(taskRunnerMap) {
             taskRunnerMap.put(taskRunner.getId(), taskRunner);
           }
-          taskRunner.init(queryConf);
+          taskRunner.init(systemConf);
           taskRunner.start();
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index 2897b14..8fc3884 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.client.YarnClient;
 import org.apache.hadoop.yarn.client.YarnClientImpl;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerLauncher;
@@ -47,12 +46,12 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
 
   private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
 
-  private QueryMasterTask.QueryContext queryContext;
+  private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
 
-  private QueryConf queryConf;
+  private TajoConf systemConf;
 
-  public YarnResourceAllocator(QueryMasterTask.QueryContext queryContext) {
-    this.queryContext = queryContext;
+  public YarnResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+    this.queryTaskContext = queryTaskContext;
   }
 
   @Override
@@ -75,19 +74,19 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
 
   @Override
   public void init(Configuration conf) {
-    queryConf = (QueryConf)conf;
+    systemConf = (TajoConf)conf;
 
-    yarnRPC = YarnRPC.create(queryConf);
+    yarnRPC = YarnRPC.create(systemConf);
 
     connectYarnClient();
 
-    taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryContext, yarnRPC);
+    taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryTaskContext, yarnRPC);
     addService((org.apache.hadoop.yarn.service.Service) taskRunnerLauncher);
-    queryContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+    queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
 
-    rmAllocator = new YarnRMContainerAllocator(queryContext);
+    rmAllocator = new YarnRMContainerAllocator(queryTaskContext);
     addService(rmAllocator);
-    queryContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
+    queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
     super.init(conf);
   }
 
@@ -108,7 +107,7 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
 
   private void connectYarnClient() {
     this.yarnClient = new YarnClientImpl();
-    this.yarnClient.init(queryConf);
+    this.yarnClient.init(systemConf);
     this.yarnClient.start();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 0694b4e..94bacdf 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -59,7 +59,7 @@ message QueryUnitRequestProto {
     optional bool interQuery = 6 [default = false];
     repeated Fetch fetches = 7;
     optional bool shouldDie = 8;
-    optional KeyValueSetProto queryMeta = 9;
+    optional KeyValueSetProto queryContext = 9;
 }
 
 message Fetch {
@@ -105,7 +105,7 @@ message Partition {
 
 message QueryExecutionRequestProto {
     required QueryIdProto queryId = 1;
-    required KeyValueSetProto queryMeta = 2;
+    required KeyValueSetProto queryContext = 2;
     required StringProto logicalPlanJson = 3;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index 2025110..a4e39ff 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -32,6 +32,11 @@
   </property>
 
   <property>
+    <name>tajo.staging.root.dir</name>
+    <value>/tmp/tajo-${user.name}/staging</value>
+  </property>
+
+  <property>
     <name>tajo.task.localdir</name>
     <value>/tmp/tajo-localdir</value>
   </property>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index e629623..5a85a07 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -21,11 +21,9 @@
  */
 package org.apache.tajo;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -33,17 +31,10 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.query.ResultSetImpl;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.TUtil;
 
-import java.io.File;
 import java.io.IOException;
-import java.sql.ResultSet;
-import java.util.List;
 import java.util.UUID;
 
 public class BackendTestingUtil {
@@ -102,36 +93,12 @@ public class BackendTestingUtil {
 
   public BackendTestingUtil(TajoConf conf) throws IOException {
     this.conf = conf;
-    this.catalog = new LocalCatalog(conf);
+    this.catalog = new LocalCatalogWrapper(conf);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
     optimizer = new LogicalOptimizer();
   }
 
-  public ResultSet run(String [] tableNames, File [] tables, Schema [] schemas, String query)
-      throws IOException, PlanningException {
-    Path workDir = createTmpTestDir();
-    StorageManager sm = StorageManager.get(new TajoConf(), workDir);
-    List<Fragment> frags = Lists.newArrayList();
-    for (int i = 0; i < tableNames.length; i++) {
-      Fragment [] splits = sm.split(tableNames[i], new Path(tables[i].getAbsolutePath()));
-      for (Fragment f : splits) {
-        frags.add(f);
-      }
-    }
-
-    TaskAttemptContext ctx = new TaskAttemptContext(conf,
-        TUtil.newQueryUnitAttemptId(),
-        frags.toArray(new Fragment[frags.size()]), workDir);
-    Expr EXPR = analyzer.parse(query);
-    LogicalPlan plan = planner.createPlan(EXPR);
-    LogicalNode rootNode = optimizer.optimize(plan);
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
-    return new ResultSetImpl(null, null, conf, new Path(workDir, "out"));
-  }
-
   public static Path createTmpTestDir() throws IOException {
     String randomStr = UUID.randomUUID().toString();
     FileSystem fs = FileSystem.getLocal(new Configuration());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
index 95dc212..3a70ac5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -39,9 +39,11 @@ import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.util.NetUtils;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 /**
  * Configures and starts the Tajo-specific components in the YARN cluster.
@@ -64,6 +66,10 @@ public class MiniTajoYarnCluster extends MiniYARNCluster {
 
   @Override
   public void init(Configuration conf) {
+
+    conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
     if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
       conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0adf22a..88029ea 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
@@ -71,8 +72,7 @@ public class TajoTestingCluster {
 	 * System property key to get test directory value.
 	 * Name is as it is because mini dfs has hard-codings to put test data here.
 	 */
-	public static final String TEST_DIRECTORY_KEY =
-      MiniDFSCluster.PROP_TEST_BUILD_DATA;
+	public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA;
 
 	/**
 	 * Default parent directory for test output.
@@ -222,7 +222,7 @@ public class TajoTestingCluster {
     }
 
     conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
-    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/tcat/db");
+    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db");
     LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
     conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
 
@@ -255,7 +255,7 @@ public class TajoTestingCluster {
 
     c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
     c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
-    c.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/tcat/db");
+    c.set(CatalogConstants.JDBC_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath() + "/db");
 
     LOG.info("derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
 
@@ -300,7 +300,7 @@ public class TajoTestingCluster {
 
       tajoWorker.startWorker(workerConf, new String[]{"standby"});
 
-      LOG.info("=====> MiniTajoCluster Worker #" + (i + 1) + " started.");
+      LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
       tajoWorkers.add(tajoWorker);
     }
   }
@@ -348,14 +348,10 @@ public class TajoTestingCluster {
    */
   public void startMiniCluster(final int numSlaves)
       throws Exception {
-    String localHostName = InetAddress.getLocalHost().getHostName();
-    startMiniCluster(numSlaves, new String[] {localHostName});
+    startMiniCluster(numSlaves, null);
   }
 
-  public void startMiniCluster(final int numSlaves,
-                                          final String [] dataNodeHosts) throws Exception {
-    // the conf is set to the distributed mode.
-    this.conf.setBoolVar(ConfVars.CLUSTER_DISTRIBUTED, true);
+  public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception {
 
     int numDataNodes = numSlaves;
     if(dataNodeHosts != null && dataNodeHosts.length != 0) {
@@ -406,12 +402,11 @@ public class TajoTestingCluster {
       yarnCluster.init(conf);
       yarnCluster.start();
 
-      conf.set(YarnConfiguration.RM_ADDRESS,
-          NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
-              getClientRMService().getBindAddress()));
-      conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
-              getApplicationMasterService().getBindAddress()));
+      ResourceManager resourceManager = yarnCluster.getResourceManager();
+      InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress();
+      InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress();
+      conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr));
+      conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr));
 
       URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
       if (url == null) {
@@ -425,9 +420,6 @@ public class TajoTestingCluster {
   }
 
   public void startMiniClusterInLocal(final int numSlaves) throws Exception {
-    // the conf is set to the distributed mode.
-    this.conf.setBoolVar(ConfVars.CLUSTER_DISTRIBUTED, true);
-
     // If we already put up a cluster, fail.
     String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
     isRunningCluster(testBuildPath);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
index a16d0f3..ba7d36b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
@@ -22,15 +22,13 @@
 package org.apache.tajo.engine.query;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -52,6 +50,7 @@ public class TestResultSetImpl {
   private static TajoTestingCluster util;
   private static TajoConf conf;
   private static StorageManager sm;
+  private static TableDesc desc;
   private static TableMeta scoreMeta;
 
   @BeforeClass
@@ -90,7 +89,7 @@ public class TestResultSetImpl {
     stat.setNumBlocks(1000);
     stat.setNumPartitions(100);
     scoreMeta.setStat(stat);
-    sm.writeTableMeta(sm.getTablePath("score"), scoreMeta);
+    desc = new TableDescImpl("score", scoreMeta, p);
   }
 
   @AfterClass
@@ -100,7 +99,7 @@ public class TestResultSetImpl {
 
   @Test
   public void test() throws IOException, SQLException {
-    ResultSetImpl rs = new ResultSetImpl(null, null, conf, sm.getTablePath("score"));
+    ResultSetImpl rs = new ResultSetImpl(null, null, conf, desc);
     ResultSetMetaData meta = rs.getMetaData();
     assertNotNull(meta);
     Schema schema = scoreMeta.getSchema();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
index aedaac3..7012cd9 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
@@ -32,6 +32,11 @@
   </property>
 
   <property>
+    <name>tajo.staging.root.dir</name>
+    <value>/tmp/tajo-${user.name}/staging</value>
+  </property>
+
+  <property>
     <name>tajo.task.localdir</name>
     <value>/tmp/tajo-localdir</value>
   </property>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml
deleted file mode 100644
index 72f0bb9..0000000
--- a/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<configuration>
-  <property>
-    <name>tajo.cluster.distributed</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <name>tajo.rootdir</name>
-    <value>file:///tmp/tajo-${user.name}</value>
-    <description>A base for other temporary directories.</description>
-  </property>
-</configuration>