You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/08/11 05:53:15 UTC

[4/5] TAJO-928: Session variables should override query configs in TajoConf.

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 4d1cee1..0c3db6d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -19,10 +19,13 @@
 package org.apache.tajo.util;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.OverridableConf;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.json.CommonGsonHelper;
 import org.apache.tajo.json.GsonObject;
+import sun.misc.FloatingDecimal;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -32,6 +35,9 @@ import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
 
 public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, GsonObject {
+  public static final String TRUE_STR = "true";
+  public static final String FALSE_STR = "false";
+
 	private KeyValueSetProto.Builder builder = KeyValueSetProto.newBuilder();
 	
 	@Expose private Map<String,String> keyVals;
@@ -63,40 +69,133 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
   public int size() {
     return keyVals.size();
   }
-	
-	public void put(String key, String val) {
-		this.keyVals.put(key, val);
-	}
 
   public void putAll(Map<String, String> keyValues) {
     if (keyValues != null) {
       this.keyVals.putAll(keyValues);
     }
   }
-	
-	public void putAll(KeyValueSet keyValueSet) {
+
+  public void putAll(KeyValueSet keyValueSet) {
     if (keyValueSet != null) {
-	    this.keyVals.putAll(keyValueSet.keyVals);
+      this.keyVals.putAll(keyValueSet.keyVals);
     }
-	}
-	
-	public String get(String key) {
-		return this.keyVals.get(key);
-	}
-	
-	public String get(String key, String defaultVal) {
-	  if(keyVals.containsKey(key))
-	    return keyVals.get(key);
-	  else {
-	    return defaultVal;
-	  }
-	}
-	
-	public Map<String,String> getAllKeyValus() {
-	  return keyVals;
-	}
+  }
+
+  public Map<String,String> getAllKeyValus() {
+    return keyVals;
+  }
+
+  public boolean containsKey(String key) {
+    return this.keyVals.containsKey(key);
+  }
+
+  public void set(String key, String val) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(val);
+
+    this.keyVals.put(key, val);
+  }
+
+  public String get(String key, String defaultVal) {
+    if(keyVals.containsKey(key)) {
+      return keyVals.get(key);
+    } else if (defaultVal != null) {
+      return defaultVal;
+    } else {
+      throw new IllegalArgumentException("No such a config key: "  + key);
+    }
+  }
+
+  public String get(String key) {
+    return get(key, null);
+  }
+
+  public void setBool(String key, boolean val) {
+    set(key, val ? TRUE_STR : FALSE_STR);
+  }
+
+  public boolean getBool(String key, Boolean defaultVal) {
+    if (containsKey(key)) {
+      String strVal = get(key, null);
+      return strVal != null ? strVal.equalsIgnoreCase(TRUE_STR) : false;
+    } else if (defaultVal != null) {
+      return defaultVal;
+    } else {
+      return false;
+    }
+  }
+
+  public boolean getBool(String key) {
+    return getBool(key, null);
+  }
+
+  public void setInt(String key, int val) {
+    set(key, String.valueOf(val));
+  }
+
+  public int getInt(String key, Integer defaultVal) {
+    if (containsKey(key)) {
+      String strVal = get(key, null);
+      return Integer.parseInt(strVal);
+    } else if (defaultVal != null) {
+      return defaultVal;
+    } else {
+      throw new IllegalArgumentException("No such a config key: "  + key);
+    }
+  }
+
+  public int getInt(String key) {
+    return getInt(key, null);
+  }
+
+  public void setLong(String key, long val) {
+    set(key, String.valueOf(val));
+  }
+
+  public long getLong(String key, Long defaultVal) {
+    if (containsKey(key)) {
+      String strVal = get(key, null);
+      return Long.parseLong(strVal);
+    } else if (defaultVal != null) {
+      return defaultVal;
+    } else {
+      throw new IllegalArgumentException("No such a config key: "  + key);
+    }
+  }
+
+  public long getLong(String key) {
+    return getLong(key, null);
+  }
+
+  public void setFloat(String key, float val) {
+    set(key, String.valueOf(val));
+  }
+
+  public float getFloat(String key, Float defaultVal) {
+    if (containsKey(key)) {
+      String strVal = get(key, null);
+      try {
+        sun.misc.FloatingDecimal fd = FloatingDecimal.readJavaFormatString(strVal);
+        if (Float.MAX_VALUE < fd.doubleValue()) {
+          throw new IllegalStateException("Parsed value is overflow in float type");
+        }
+        return fd.floatValue();
+      } catch (NumberFormatException nfe) {
+        throw new IllegalArgumentException("No such a config key: "  + key);
+      }
+    } else if (defaultVal != null) {
+      return defaultVal.floatValue();
+    } else {
+      throw new IllegalArgumentException("No such a config key: "  + key);
+    }
+  }
+
+  public float getFloat(String key) {
+    return getFloat(key, null);
+  }
 	
-	public String delete(String key) {
+	public String remove(String key) {
 		return keyVals.remove(key);
 	}
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java
index adf80b0..42623bd 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestArithmeticOperator.java
@@ -52,9 +52,9 @@ public class TestArithmeticOperator {
   public void setUp() {
     TajoConf tajoConf = new TajoConf();
     if ("Zero_Exception".equals(option)) {
-      tajoConf.setBoolVar(ConfVars.BEHAVIOR_ARITHMETIC_ABORT, true);
+      tajoConf.setBoolVar(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT, true);
     } else {
-      tajoConf.setBoolVar(ConfVars.BEHAVIOR_ARITHMETIC_ABORT, false);
+      tajoConf.setBoolVar(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT, false);
     }
     Datum.initAbortWhenDivideByZero(tajoConf);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index 3ffeeb0..a43cc1a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -35,7 +36,7 @@ import org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrderAlgor
 import org.apache.tajo.engine.planner.logical.join.JoinGraph;
 import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm;
 import org.apache.tajo.engine.planner.rewrite.*;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.engine.query.QueryContext;
 
 import java.util.LinkedHashSet;
 import java.util.Set;
@@ -57,7 +58,7 @@ public class LogicalOptimizer {
 
   public LogicalOptimizer(TajoConf systemConf) {
     rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
-    if (systemConf.getBoolVar(ConfVars.PLANNER_USE_FILTER_PUSHDOWN)) {
+    if (systemConf.getBoolVar(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED)) {
       rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
     }
 
@@ -84,13 +85,13 @@ public class LogicalOptimizer {
     return optimize(null, plan);
   }
 
-  public LogicalNode optimize(Session session, LogicalPlan plan) throws PlanningException {
+  public LogicalNode optimize(QueryContext context, LogicalPlan plan) throws PlanningException {
     rulesBeforeJoinOpt.rewrite(plan);
 
     DirectedGraphCursor<String, BlockEdge> blockCursor =
         new DirectedGraphCursor<String, BlockEdge>(plan.getQueryBlockGraph(), plan.getRootBlock().getName());
 
-    if (session == null || "true".equals(session.getVariable(ConfVars.OPTIMIZER_JOIN_ENABLE.varname, "true"))) {
+    if (context == null || context.getBool(SessionVars.TEST_JOIN_OPT_ENABLED)) {
       // default is true
       while (blockCursor.hasNext()) {
         optimizeJoinOrder(plan, blockCursor.nextBlock());

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 86bacef..ee65b2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -67,10 +67,8 @@ public class LogicalPlan {
   LogicalPlanner planner;
 
   private boolean isExplain;
-  private final String currentDatabase;
 
-  public LogicalPlan(String currentDatabase, LogicalPlanner planner) {
-    this.currentDatabase = currentDatabase;
+  public LogicalPlan(LogicalPlanner planner) {
     this.planner = planner;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
index 84fe6c2..6ee0ff8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
@@ -26,10 +26,10 @@ import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.exception.NoSuchColumnException;
 import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
 import org.apache.tajo.engine.planner.nameresolver.NameResolver;
+import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.master.session.Session;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;
@@ -42,18 +42,18 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr
   private ExprAnnotator annotator;
 
   public static class PreprocessContext {
-    public Session session;
+    public QueryContext queryContext;
     public LogicalPlan plan;
     public LogicalPlan.QueryBlock currentBlock;
 
-    public PreprocessContext(Session session, LogicalPlan plan, LogicalPlan.QueryBlock currentBlock) {
-      this.session = session;
+    public PreprocessContext(QueryContext queryContext, LogicalPlan plan, LogicalPlan.QueryBlock currentBlock) {
+      this.queryContext = queryContext;
       this.plan = plan;
       this.currentBlock = currentBlock;
     }
 
     public PreprocessContext(PreprocessContext context, LogicalPlan.QueryBlock currentBlock) {
-      this.session = context.session;
+      this.queryContext = context.queryContext;
       this.plan = context.plan;
       this.currentBlock = currentBlock;
     }
@@ -104,7 +104,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr
       if (CatalogUtil.isFQTableName(asteriskExpr.getQualifier())) {
         qualifier = asteriskExpr.getQualifier();
       } else {
-        qualifier = CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), asteriskExpr.getQualifier());
+        qualifier = CatalogUtil.buildFQName(ctx.queryContext.getCurrentDatabase(), asteriskExpr.getQualifier());
       }
 
       relationOp = block.getRelation(qualifier);
@@ -359,7 +359,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr
     if (CatalogUtil.isFQTableName(expr.getName())) {
       actualRelationName = relation.getName();
     } else {
-      actualRelationName = CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), relation.getName());
+      actualRelationName = CatalogUtil.buildFQName(ctx.queryContext.getCurrentDatabase(), relation.getName());
     }
 
     TableDesc desc = catalog.getTableDesc(actualRelationName);
@@ -388,7 +388,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPrepr
 
     // a table subquery should be dealt as a relation.
     TableSubQueryNode node = ctx.plan.createNode(TableSubQueryNode.class);
-    node.init(CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), expr.getName()), child);
+    node.init(CatalogUtil.buildFQName(ctx.queryContext.getCurrentDatabase(), expr.getName()), child);
     ctx.currentBlock.addRelation(node);
     return node;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
index bb8192f..6512ae0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -24,7 +24,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.engine.query.QueryContext;
 
 import java.util.Stack;
 
@@ -38,17 +38,17 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<LogicalPlanVeri
   }
 
   public static class Context {
-    Session session;
+    QueryContext queryContext;
     VerificationState state;
 
-    public Context(Session session, VerificationState state) {
-      this.session = session;
+    public Context(QueryContext queryContext, VerificationState state) {
+      this.queryContext = this.queryContext;
       this.state = state;
     }
   }
 
-  public VerificationState verify(Session session, VerificationState state, LogicalPlan plan) throws PlanningException {
-    Context context = new Context(session, state);
+  public VerificationState verify(QueryContext queryContext, VerificationState state, LogicalPlan plan) throws PlanningException {
+    Context context = new Context(queryContext, state);
     visit(context, plan, plan.getRootBlock());
     return context.state;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index a4820cb..35df11f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -42,17 +42,16 @@ import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.nameresolver.NameResolvingMode;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.master.session.Session;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;
 
 import static org.apache.tajo.algebra.CreateTable.PartitionType;
-
 import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
 import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
 import static org.apache.tajo.engine.planner.LogicalPlanPreprocessor.PreprocessContext;
@@ -75,7 +74,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   public static class PlanContext {
-    Session session;
+    QueryContext queryContext;
     LogicalPlan plan;
 
     // transient data for each query block
@@ -83,15 +82,15 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     boolean debugOrUnitTests;
 
-    public PlanContext(Session session, LogicalPlan plan, QueryBlock block, boolean debugOrUnitTests) {
-      this.session = session;
+    public PlanContext(QueryContext context, LogicalPlan plan, QueryBlock block, boolean debugOrUnitTests) {
+      this.queryContext = context;
       this.plan = plan;
       this.queryBlock = block;
       this.debugOrUnitTests = debugOrUnitTests;
     }
 
     public PlanContext(PlanContext context, QueryBlock block) {
-      this.session = context.session;
+      this.queryContext = context.queryContext;
       this.plan = context.plan;
       this.queryBlock = block;
       this.debugOrUnitTests = context.debugOrUnitTests;
@@ -109,21 +108,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * @param expr A relational algebraic expression for a query.
    * @return A logical plan
    */
-  public LogicalPlan createPlan(Session session, Expr expr) throws PlanningException {
-    return createPlan(session, expr, false);
+  public LogicalPlan createPlan(QueryContext context, Expr expr) throws PlanningException {
+    return createPlan(context, expr, false);
   }
 
   @VisibleForTesting
-  public LogicalPlan createPlan(Session session, Expr expr, boolean debug) throws PlanningException {
+  public LogicalPlan createPlan(QueryContext queryContext, Expr expr, boolean debug) throws PlanningException {
 
-    LogicalPlan plan = new LogicalPlan(session.getCurrentDatabase(), this);
+    LogicalPlan plan = new LogicalPlan(this);
 
     QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
-    PreprocessContext preProcessorCtx = new PreprocessContext(session, plan, rootBlock);
+    PreprocessContext preProcessorCtx = new PreprocessContext(queryContext, plan, rootBlock);
     preprocessor.visit(preProcessorCtx, new Stack<Expr>(), expr);
     plan.resetGeneratedId();
 
-    PlanContext context = new PlanContext(session, plan, plan.getRootBlock(), debug);
+    PlanContext context = new PlanContext(queryContext, plan, plan.getRootBlock(), debug);
     LogicalNode topMostNode = this.visit(context, new Stack<Expr>(), expr);
 
     // Add Root Node
@@ -1424,7 +1423,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       databaseName = CatalogUtil.extractQualifier(expr.getTableName());
       tableName = CatalogUtil.extractSimpleName(expr.getTableName());
     } else {
-      databaseName = context.session.getCurrentDatabase();
+      databaseName = context.queryContext.getCurrentDatabase();
       tableName = expr.getTableName();
     }
     TableDesc desc = catalog.getTableDesc(databaseName, tableName);
@@ -1624,7 +1623,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     if (CatalogUtil.isFQTableName(parentTableName) == false) {
       parentTableName =
-	CatalogUtil.buildFQName(context.session.getCurrentDatabase(),
+	CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(),
 				parentTableName);
     }
     TableDesc parentTableDesc = catalog.getTableDesc(parentTableName);
@@ -1657,7 +1656,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       createTableNode.setTableName(expr.getTableName());
     } else {
       createTableNode.setTableName(
-          CatalogUtil.buildFQName(context.session.getCurrentDatabase(), expr.getTableName()));
+          CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), expr.getTableName()));
     }
     // This is CREATE TABLE <tablename> LIKE <parentTable>
     if(expr.getLikeParentTableName() != null)
@@ -1753,7 +1752,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr;
       String partitionExpression = Joiner.on(',').join(partition.getColumns());
 
-      partitionMethodDesc = new PartitionMethodDesc(context.session.getCurrentDatabase(), tableName,
+      partitionMethodDesc = new PartitionMethodDesc(context.queryContext.getCurrentDatabase(), tableName,
           CatalogProtos.PartitionType.COLUMN, partitionExpression, convertColumnsToSchema(partition.getColumns()));
     } else {
       throw new PlanningException(String.format("Not supported PartitonType: %s", expr.getPartitionType()));
@@ -1816,7 +1815,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     if (CatalogUtil.isFQTableName(dropTable.getTableName())) {
       qualified = dropTable.getTableName();
     } else {
-      qualified = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), dropTable.getTableName());
+      qualified = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), dropTable.getTableName());
     }
     dropTableNode.init(qualified, dropTable.isIfExists(), dropTable.isPurge());
     return dropTableNode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 6678e46..9f533e2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ObjectArrays;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -37,6 +38,7 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
@@ -57,7 +59,6 @@ import java.util.Stack;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
-import static org.apache.tajo.conf.TajoConf.ConfVars;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
@@ -68,7 +69,6 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
 public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
   private static final int UNGENERATED_PID = -1;
-  private final long INNER_JOIN_INMEMORY_HASH_THRESHOLD;
 
   protected final TajoConf conf;
   protected final AbstractStorageManager sm;
@@ -76,8 +76,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
     this.conf = conf;
     this.sm = sm;
-
-    this.INNER_JOIN_INMEMORY_HASH_THRESHOLD = conf.getLongVar(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
   }
 
   public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
@@ -258,7 +256,16 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       throws IOException {
     String [] lineage = PlannerUtil.getRelationLineage(node);
     long volume = estimateSizeRecursive(context, lineage);
-    boolean inMemoryInnerJoinFlag = volume <= INNER_JOIN_INMEMORY_HASH_THRESHOLD;
+    boolean inMemoryInnerJoinFlag = false;
+
+    QueryContext queryContext = context.getQueryContext();
+
+    if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) {
+      inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT);
+    } else {
+      inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+    }
+
     LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
         context.getTaskId().toString(),
         (left ? "Left" : "Right"),
@@ -470,8 +477,17 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                                    PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long rightTableVolume = estimateSizeRecursive(context, rightLineage);
+    boolean hashJoin;
 
-    if (rightTableVolume < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
+    QueryContext queryContext = context.getQueryContext();
+
+    if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
+      hashJoin = rightTableVolume <  queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
+    } else {
+      hashJoin = rightTableVolume <  queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+    }
+
+    if (hashJoin) {
       // we can implement left outer join using hash join, using the right operand as the build relation
       LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
       return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
@@ -488,8 +504,18 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
     // blocking, but merge join is blocking as well)
     String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
-    long outerSize = estimateSizeRecursive(context, outerLineage4);
-    if (outerSize < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
+    long leftTableVolume = estimateSizeRecursive(context, outerLineage4);
+    boolean hashJoin;
+
+    QueryContext queryContext = context.getQueryContext();
+
+    if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
+      hashJoin = leftTableVolume <  queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
+    } else {
+      hashJoin = leftTableVolume <  queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
+    }
+
+    if (hashJoin){
       LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
       return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
     } else {
@@ -971,7 +997,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
-    final long threshold = conf.getLongVar(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
+    final long threshold = context.getQueryContext().getLong(SessionVars.HASH_GROUPBY_SIZE_LIMIT);
 
     // if the relation size is less than the threshold,
     // the hash aggregation will be used.

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 2d6c095..75dcc18 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -24,7 +24,7 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.util.TUtil;
 
 import java.util.Set;
@@ -38,17 +38,17 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe
   }
 
   public static class Context {
-    Session session;
+    QueryContext queryContext;
     VerificationState state;
 
-    public Context(Session session, VerificationState state) {
-      this.session = session;
+    public Context(QueryContext queryContext, VerificationState state) {
+      this.queryContext = queryContext;
       this.state = state;
     }
   }
 
-  public VerificationState verify(Session session, VerificationState state, Expr expr) throws PlanningException {
-    Context context = new Context(session, state);
+  public VerificationState verify(QueryContext queryContext, VerificationState state, Expr expr) throws PlanningException {
+    Context context = new Context(queryContext, state);
     visit(context, new Stack<Expr>(), expr);
     return context.state;
   }
@@ -127,7 +127,7 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe
     if (CatalogUtil.isFQTableName(tableName)) {
       qualifiedName = tableName;
     } else {
-      qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+      qualifiedName = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), tableName);
     }
 
     if (!catalog.existsTable(qualifiedName)) {
@@ -143,7 +143,10 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe
     if (CatalogUtil.isFQTableName(tableName)) {
       qualifiedName = tableName;
     } else {
-      qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(), tableName);
+      qualifiedName = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(), tableName);
+    }
+    if(qualifiedName == null) {
+      System.out.println("A");
     }
     if (catalog.existsTable(qualifiedName)) {
       context.state.addVerification(String.format("relation \"%s\" already exists", qualifiedName));
@@ -246,7 +249,7 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <PreLogicalPlanVe
         if (expr.hasTableName()) {
           String qualifiedName = expr.getTableName();
           if (TajoConstants.EMPTY_STRING.equals(CatalogUtil.extractQualifier(expr.getTableName()))) {
-            qualifiedName = CatalogUtil.buildFQName(context.session.getCurrentDatabase(),
+            qualifiedName = CatalogUtil.buildFQName(context.queryContext.getCurrentDatabase(),
                 expr.getTableName());
           }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 2daf799..432589b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -120,9 +121,8 @@ public class GlobalPlanner {
     LogicalNode inputPlan = PlannerUtil.clone(masterPlan.getLogicalPlan(),
         masterPlan.getLogicalPlan().getRootBlock().getRoot());
 
-    boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO);
-    if (autoBroadcast) {
-
+    boolean broadcastEnabled = masterPlan.getContext().getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
+    if (broadcastEnabled) {
       // pre-visit the master plan in order to find tables to be broadcasted
       // this visiting does not make any execution block and change plan.
       BroadcastJoinMarkCandidateVisitor markCandidateVisitor = new BroadcastJoinMarkCandidateVisitor();
@@ -268,11 +268,11 @@ public class GlobalPlanner {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
-    boolean autoBroadcast = conf.getBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO);
-    long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD);
+    boolean broadcastEnabled = context.getPlan().getContext().getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED);
+    long broadcastTableSizeLimit = context.getPlan().getContext().getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT);
 
     // to check when the tajo.dist-query.join.broadcast.auto property is true
-    if (autoBroadcast && joinNode.isCandidateBroadcast()) {
+    if (broadcastEnabled && joinNode.isCandidateBroadcast()) {
       LogicalNode leftNode = joinNode.getLeftChild();
       LogicalNode rightNode = joinNode.getRightChild();
 
@@ -293,7 +293,7 @@ public class GlobalPlanner {
       // Checking Left Side of Join
       if (ScanNode.isScanNode(leftNode)) {
         ScanNode scanNode = (ScanNode)leftNode;
-        if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) {
+        if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >= broadcastTableSizeLimit) {
           numLargeTables++;
         } else {
           leftBroadcast = true;
@@ -306,7 +306,7 @@ public class GlobalPlanner {
       // Checking Right Side OF Join
       if (ScanNode.isScanNode(rightNode)) {
         ScanNode scanNode = (ScanNode)rightNode;
-        if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) {
+        if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >= broadcastTableSizeLimit) {
           numLargeTables++;
         } else {
           rightBroadcast = true;
@@ -331,7 +331,7 @@ public class GlobalPlanner {
         }
         JoinNode broadcastJoinNode = (JoinNode)eachNode;
         ScanNode scanNode = broadcastJoinNode.getRightChild();
-        if (getTableVolume(scanNode) < broadcastThreshold) {
+        if (getTableVolume(scanNode) < broadcastTableSizeLimit) {
           broadcastTargetScanNodes.add(scanNode);
           blockJoinNode = broadcastJoinNode;
           LOG.info("The table " + scanNode.getCanonicalName() + " ("

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index f714758..31cb3b6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -38,6 +39,7 @@ import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -111,7 +113,7 @@ public class ExternalSortExec extends SortExec {
       throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2");
     }
     // TODO - sort buffer and core num should be changed to use the allocated container resource.
-    this.sortBufferBytesNum = context.getConf().getLongVar(ConfVars.EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE) * 1048576L;
+    this.sortBufferBytesNum = context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB;
     this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
     this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
     this.inMemoryTable = new ArrayList<Tuple>(100000);

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index b1d0400..e73cc2f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
 import org.apache.tajo.storage.Appender;
@@ -59,7 +59,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
       appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
           createTableNode.getTableSchema(), context.getOutputPath());
     } else {
-      String nullChar = context.getQueryContext().get(ConfVars.CSVFILE_NULL.varname, ConfVars.CSVFILE_NULL.defaultVal);
+      String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR);
       meta.putOption(StorageConstants.CSVFILE_NULL, nullChar);
       appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
           context.getOutputPath());
@@ -77,7 +77,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
     while((tuple = child.next()) != null) {
       appender.addTuple(tuple);
     }
-        
+
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 79d6cb3..f4160e4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -19,94 +19,90 @@
 package org.apache.tajo.engine.query;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.ConfigKey;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.master.session.Session;
 
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
 
-public class QueryContext extends KeyValueSet {
-  public static final String COMMAND_TYPE = "tajo.query.command";
-
-  public static final String STAGING_DIR = "tajo.query.staging_dir";
-
-  public static final String USER_NAME = "tajo.query.username";
-
-  public static final String OUTPUT_TABLE_NAME = "tajo.query.output.table";
-  public static final String OUTPUT_TABLE_PATH = "tajo.query.output.path";
-  public static final String OUTPUT_PARTITIONS = "tajo.query.output.partitions";
-  public static final String OUTPUT_OVERWRITE = "tajo.query.output.overwrite";
-  public static final String OUTPUT_AS_DIRECTORY = "tajo.query.output.asdirectory";
+/**
+ * QueryContent is a overridable config, and it provides a set of various configs for a query instance.
+ */
+public class QueryContext extends OverridableConf {
+  public static enum QueryVars implements ConfigKey {
+    COMMAND_TYPE,
+    STAGING_DIR,
+    OUTPUT_TABLE_NAME,
+    OUTPUT_TABLE_PATH,
+    OUTPUT_PARTITIONS,
+    OUTPUT_OVERWRITE,
+    OUTPUT_AS_DIRECTORY,
+    OUTPUT_PER_FILE_SIZE,
+    ;
 
-  public static final String TRUE_VALUE = "1";
-  public static final String FALSE_VALUE = "0";
+    QueryVars() {
+    }
 
-  public QueryContext() {}
+    @Override
+    public String keyname() {
+      return name().toLowerCase();
+    }
 
-  public QueryContext(KeyValueSetProto proto) {
-    super(proto);
+    @Override
+    public ConfigType type() {
+      return ConfigType.QUERY;
+    }
   }
 
-  public void put(TajoConf.ConfVars key, String value) {
-    put(key.varname, value);
+  public QueryContext(TajoConf conf) {
+    super(conf, ConfigKey.ConfigType.QUERY);
   }
 
-  public String get(TajoConf.ConfVars key) {
-    return get(key.varname);
+  public QueryContext(TajoConf conf, Session session) {
+    super(conf);
+    putAll(session.getAllVariables());
   }
 
-  public String get(String key) {
-    return super.get(key);
+  public QueryContext(TajoConf conf, KeyValueSetProto proto) {
+    super(conf, proto, ConfigKey.ConfigType.QUERY);
   }
 
-  public void setBool(String key, boolean val) {
-    put(key, val ? TRUE_VALUE : FALSE_VALUE);
-  }
+  //-----------------------------------------------------------------------------------------------
+  // Query Config Specified Section
+  //-----------------------------------------------------------------------------------------------
 
-  public boolean getBool(String key) {
-    String strVal = get(key);
-    return strVal != null ? strVal.equals(TRUE_VALUE) : false;
+  public String getCurrentDatabase() {
+    return get(SessionVars.CURRENT_DATABASE);
   }
 
   public void setUser(String username) {
-    put(USER_NAME, username);
+    put(SessionVars.USERNAME, username);
   }
 
   public String getUser() {
-    return get(USER_NAME);
+    return get(SessionVars.USERNAME);
   }
 
   public void setStagingDir(Path path) {
-    put(STAGING_DIR, path.toUri().toString());
+    put(QueryVars.STAGING_DIR, path.toUri().toString());
   }
 
   public Path getStagingDir() {
-    String strVal = get(STAGING_DIR);
+    String strVal = get(QueryVars.STAGING_DIR);
     return strVal != null ? new Path(strVal) : null;
   }
 
   /**
-   * The fact that QueryContext has an output table means this query has a target table.
-   * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO <table name>' statement.
-   * This config is not set if a query has INSERT (OVERWRITE) INTO LOCATION '/path/..'.
-   */
-  public boolean hasOutputTable() {
-    return get(OUTPUT_TABLE_NAME) != null;
-  }
-
-  /**
    * Set a target table name
    *
    * @param tableName The target table name
    */
   public void setOutputTable(String tableName) {
-    put(OUTPUT_TABLE_NAME, tableName);
-  }
-
-  public String getOutputTable() {
-    String strVal = get(OUTPUT_TABLE_NAME);
-    return strVal != null ? strVal : null;
+    put(QueryVars.OUTPUT_TABLE_NAME, tableName);
   }
 
   /**
@@ -116,52 +112,64 @@ public class QueryContext extends KeyValueSet {
    * @return
    */
   public boolean hasOutputPath() {
-    return get(OUTPUT_TABLE_PATH) != null;
+    return containsKey(QueryVars.OUTPUT_TABLE_PATH);
   }
 
   public void setOutputPath(Path path) {
-    put(OUTPUT_TABLE_PATH, path.toUri().toString());
+    put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString());
   }
 
   public Path getOutputPath() {
-    String strVal = get(OUTPUT_TABLE_PATH);
+    String strVal = get(QueryVars.OUTPUT_TABLE_PATH);
     return strVal != null ? new Path(strVal) : null;
   }
 
   public boolean hasPartition() {
-    return get(OUTPUT_PARTITIONS) != null;
+    return containsKey(QueryVars.OUTPUT_PARTITIONS);
   }
 
   public void setPartitionMethod(PartitionMethodDesc partitionMethodDesc) {
-    put(OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null);
+    put(QueryVars.OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null);
   }
 
   public PartitionMethodDesc getPartitionMethod() {
-    return PartitionMethodDesc.fromJson(get(OUTPUT_PARTITIONS));
+    return PartitionMethodDesc.fromJson(get(QueryVars.OUTPUT_PARTITIONS));
   }
 
   public void setOutputOverwrite() {
-    setBool(OUTPUT_OVERWRITE, true);
+    setBool(QueryVars.OUTPUT_OVERWRITE, true);
   }
 
   public boolean isOutputOverwrite() {
-    return getBool(OUTPUT_OVERWRITE);
+    return getBool(QueryVars.OUTPUT_OVERWRITE);
   }
 
   public void setFileOutput() {
-    setBool(OUTPUT_AS_DIRECTORY, true);
+    setBool(QueryVars.OUTPUT_AS_DIRECTORY, true);
+  }
+
+  public boolean containsKey(ConfigKey key) {
+    return containsKey(key.keyname());
+  }
+
+  public boolean equalKey(ConfigKey key, String another) {
+    if (containsKey(key)) {
+      return get(key).equals(another);
+    } else {
+      return false;
+    }
   }
 
-  public boolean isFileOutput() {
-    return getBool(OUTPUT_AS_DIRECTORY);
+  public boolean isCommandType(NodeType commandType) {
+    return equalKey(QueryVars.COMMAND_TYPE, commandType.name());
   }
 
   public void setCommandType(NodeType nodeType) {
-    put(COMMAND_TYPE, nodeType.name());
+    put(QueryVars.COMMAND_TYPE, nodeType.name());
   }
 
   public NodeType getCommandType() {
-    String strVal = get(COMMAND_TYPE);
+    String strVal = get(QueryVars.COMMAND_TYPE);
     return strVal != null ? NodeType.valueOf(strVal) : null;
   }
 
@@ -170,7 +178,7 @@ public class QueryContext extends KeyValueSet {
   }
 
   public boolean isCreateTable() {
-    return getCommandType() == NodeType.CREATE_TABLE;
+    return isCommandType(NodeType.CREATE_TABLE);
   }
 
   public void setInsert() {
@@ -178,14 +186,6 @@ public class QueryContext extends KeyValueSet {
   }
 
   public boolean isInsert() {
-    return getCommandType() == NodeType.INSERT;
-  }
-
-  public void setHiveQueryMode() {
-    setBool("hive.query.mode", true);
-  }
-
-  public boolean isHiveQueryMode() {
-    return getBool("hive.query.mode");
+    return isCommandType(NodeType.INSERT);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index f1af2ff..56df48d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.query;
 
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -193,7 +194,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
     if (!p.hasQueryContext()) {
       return null;
     }
-    this.queryContext = new QueryContext(p.getQueryContext());
+    this.queryContext = new QueryContext(new TajoConf(), p.getQueryContext());
     return this.queryContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 73f3cf5..37a56ba 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.algebra.Expr;
@@ -116,40 +117,25 @@ public class GlobalEngine extends AbstractService {
 
   public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) {
     LOG.info("Query: " + query);
-    QueryContext queryContext = new QueryContext();
-    queryContext.putAll(session.getAllVariables());
+    QueryContext queryContext = new QueryContext(context.getConf(), session);
     Expr planningContext;
 
     try {
       if (isJson) {
         planningContext = buildExpressionFromJson(query);
       } else {
-        // setting environment variables
-        String [] cmds = query.split(" ");
-        if(cmds != null) {
-          if(cmds[0].equalsIgnoreCase("set")) {
-            String[] params = cmds[1].split("=");
-            context.getConf().set(params[0], params[1]);
-            SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
-            responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
-            responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-            responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-            return responseBuilder.build();
-          }
-        }
-
         planningContext = buildExpressionFromSql(queryContext, query);
       }
 
       String jsonExpr = planningContext.toJson();
-      LogicalPlan plan = createLogicalPlan(session, planningContext);
+      LogicalPlan plan = createLogicalPlan(queryContext, planningContext);
       SubmitQueryResponse response = executeQueryInternal(queryContext, session, plan, query, jsonExpr);
       return response;
     } catch (Throwable t) {
       context.getSystemMetrics().counter("Query", "errorQuery").inc();
       LOG.error("\nStack Trace:\n" + StringUtils.stringifyException(t));
       SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
-      responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+      responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME));
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
       responseBuilder.setIsForwarded(true);
       responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
@@ -183,11 +169,11 @@ public class GlobalEngine extends AbstractService {
 
     SubmitQueryResponse.Builder responseBuilder = SubmitQueryResponse.newBuilder();
     responseBuilder.setIsForwarded(false);
-    responseBuilder.setUserName(context.getConf().getVar(TajoConf.ConfVars.USERNAME));
+    responseBuilder.setUserName(queryContext.get(SessionVars.USERNAME));
 
     if (PlannerUtil.checkIfDDLPlan(rootNode)) {
       context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
-      updateQuery(session, rootNode.getChild());
+      updateQuery(queryContext, rootNode.getChild());
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
       responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
 
@@ -310,7 +296,7 @@ public class GlobalEngine extends AbstractService {
     }
 
     TaskAttemptContext taskAttemptContext =
-        new TaskAttemptContext(context.getConf(), queryContext, null, (CatalogProtos.FragmentProto[]) null, stagingDir);
+        new TaskAttemptContext(queryContext, null, (CatalogProtos.FragmentProto[]) null, stagingDir);
     taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
 
     EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
@@ -390,7 +376,7 @@ public class GlobalEngine extends AbstractService {
   }
 
 
-  public QueryId updateQuery(Session session, String sql, boolean isJson) throws IOException, SQLException, PlanningException {
+  public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws IOException, SQLException, PlanningException {
     try {
       LOG.info("SQL: " + sql);
 
@@ -402,13 +388,13 @@ public class GlobalEngine extends AbstractService {
         expr = analyzer.parse(sql);
       }
 
-      LogicalPlan plan = createLogicalPlan(session, expr);
+      LogicalPlan plan = createLogicalPlan(queryContext, expr);
       LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
       if (!PlannerUtil.checkIfDDLPlan(rootNode)) {
         throw new SQLException("This is not update query:\n" + sql);
       } else {
-        updateQuery(session, rootNode.getChild());
+        updateQuery(queryContext, rootNode.getChild());
         return QueryIdFactory.NULL_QUERY_ID;
       }
     } catch (Exception e) {
@@ -417,46 +403,46 @@ public class GlobalEngine extends AbstractService {
     }
   }
 
-  private boolean updateQuery(Session session, LogicalNode root) throws IOException {
+  private boolean updateQuery(QueryContext queryContext, LogicalNode root) throws IOException {
 
     switch (root.getType()) {
       case CREATE_DATABASE:
         CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
-        createDatabase(session, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+        createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
         return true;
       case DROP_DATABASE:
         DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
-        dropDatabase(session, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+        dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
         return true;
       case CREATE_TABLE:
         CreateTableNode createTable = (CreateTableNode) root;
-        createTable(session, createTable, createTable.isIfNotExists());
+        createTable(queryContext, createTable, createTable.isIfNotExists());
         return true;
       case DROP_TABLE:
         DropTableNode dropTable = (DropTableNode) root;
-        dropTable(session, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+        dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
         return true;
       case ALTER_TABLESPACE:
         AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
-        alterTablespace(session, alterTablespace);
+        alterTablespace(queryContext, alterTablespace);
         return true;
       case ALTER_TABLE:
         AlterTableNode alterTable = (AlterTableNode) root;
-        alterTable(session,alterTable);
+        alterTable(queryContext,alterTable);
         return true;
       case TRUNCATE_TABLE:
         TruncateTableNode truncateTable = (TruncateTableNode) root;
-        truncateTable(session, truncateTable);
+        truncateTable(queryContext, truncateTable);
         return true;
       default:
         throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
     }
   }
 
-  private LogicalPlan createLogicalPlan(Session session, Expr expression) throws PlanningException {
+  private LogicalPlan createLogicalPlan(QueryContext queryContext, Expr expression) throws PlanningException {
 
     VerificationState state = new VerificationState();
-    preVerifier.verify(session, state, expression);
+    preVerifier.verify(queryContext, state, expression);
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
       for (String error : state.getErrorMessages()) {
@@ -465,19 +451,19 @@ public class GlobalEngine extends AbstractService {
       throw new VerifyException(sb.toString());
     }
 
-    LogicalPlan plan = planner.createPlan(session, expression);
+    LogicalPlan plan = planner.createPlan(queryContext, expression);
     if (LOG.isDebugEnabled()) {
       LOG.debug("=============================================");
       LOG.debug("Non Optimized Query: \n" + plan.toString());
       LOG.debug("=============================================");
     }
     LOG.info("Non Optimized Query: \n" + plan.toString());
-    optimizer.optimize(session, plan);
+    optimizer.optimize(queryContext, plan);
     LOG.info("=============================================");
     LOG.info("Optimized Query: \n" + plan.toString());
     LOG.info("=============================================");
 
-    annotatedPlanVerifier.verify(session, state, plan);
+    annotatedPlanVerifier.verify(queryContext, state, plan);
 
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
@@ -493,7 +479,7 @@ public class GlobalEngine extends AbstractService {
   /**
    * Alter a given table
    */
-  public void alterTablespace(final Session session, final AlterTablespaceNode alterTablespace) {
+  public void alterTablespace(final QueryContext queryContext, final AlterTablespaceNode alterTablespace) {
 
     final CatalogService catalog = context.getCatalog();
     final String spaceName = alterTablespace.getTablespaceName();
@@ -517,7 +503,7 @@ public class GlobalEngine extends AbstractService {
   /**
    * Alter a given table
    */
-  public void alterTable(final Session session, final AlterTableNode alterTable) throws IOException {
+  public void alterTable(final QueryContext queryContext, final AlterTableNode alterTable) throws IOException {
 
     final CatalogService catalog = context.getCatalog();
     final String tableName = alterTable.getTableName();
@@ -529,7 +515,7 @@ public class GlobalEngine extends AbstractService {
       databaseName = split[0];
       simpleTableName = split[1];
     } else {
-      databaseName = session.getCurrentDatabase();
+      databaseName = queryContext.getCurrentDatabase();
       simpleTableName = tableName;
     }
     final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
@@ -572,7 +558,8 @@ public class GlobalEngine extends AbstractService {
         if (existColumnName(qualifiedName, alterTable.getNewColumnName())) {
           throw new ColumnNameAlreadyExistException(alterTable.getNewColumnName());
         }
-        catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(), alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
+        catalog.alterTable(CatalogUtil.renameColumn(qualifiedName, alterTable.getColumnName(),
+            alterTable.getNewColumnName(), AlterTableType.RENAME_COLUMN));
         break;
       case ADD_COLUMN:
         if (existColumnName(qualifiedName, alterTable.getAddNewColumn().getSimpleName())) {
@@ -588,7 +575,8 @@ public class GlobalEngine extends AbstractService {
   /**
    * Truncate table a given table
    */
-  public void truncateTable(final Session session, final TruncateTableNode truncateTableNode) throws IOException {
+  public void truncateTable(final QueryContext queryContext, final TruncateTableNode truncateTableNode)
+      throws IOException {
     List<String> tableNames = truncateTableNode.getTableNames();
     final CatalogService catalog = context.getCatalog();
 
@@ -602,7 +590,7 @@ public class GlobalEngine extends AbstractService {
         databaseName = split[0];
         simpleTableName = split[1];
       } else {
-        databaseName = session.getCurrentDatabase();
+        databaseName = queryContext.getCurrentDatabase();
         simpleTableName = eachTableName;
       }
       final String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
@@ -641,7 +629,7 @@ public class GlobalEngine extends AbstractService {
     return tableDesc.getSchema().containsByName(columnName) ? true : false;
   }
 
-  private TableDesc createTable(Session session, CreateTableNode createTable, boolean ifNotExists) throws IOException {
+  private TableDesc createTable(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists) throws IOException {
     TableMeta meta;
 
     if (createTable.hasOptions()) {
@@ -659,7 +647,7 @@ public class GlobalEngine extends AbstractService {
         databaseName = CatalogUtil.extractQualifier(createTable.getTableName());
         tableName = CatalogUtil.extractSimpleName(createTable.getTableName());
       } else {
-        databaseName = session.getCurrentDatabase();
+        databaseName = queryContext.getCurrentDatabase();
         tableName = createTable.getTableName();
       }
 
@@ -668,11 +656,11 @@ public class GlobalEngine extends AbstractService {
       createTable.setPath(tablePath);
     }
 
-    return createTableOnPath(session, createTable.getTableName(), createTable.getTableSchema(),
+    return createTableOnPath(queryContext, createTable.getTableName(), createTable.getTableSchema(),
         meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists);
   }
 
-  public TableDesc createTableOnPath(Session session, String tableName, Schema schema, TableMeta meta,
+  public TableDesc createTableOnPath(QueryContext queryContext, String tableName, Schema schema, TableMeta meta,
                                      Path path, boolean isExternal, PartitionMethodDesc partitionDesc,
                                      boolean ifNotExists)
       throws IOException {
@@ -683,7 +671,7 @@ public class GlobalEngine extends AbstractService {
       databaseName = splitted[0];
       simpleTableName = splitted[1];
     } else {
-      databaseName = session.getCurrentDatabase();
+      databaseName = queryContext.getCurrentDatabase();
       simpleTableName = tableName;
     }
     String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);
@@ -736,7 +724,7 @@ public class GlobalEngine extends AbstractService {
     }
   }
 
-  public boolean createDatabase(@Nullable Session session, String databaseName,
+  public boolean createDatabase(@Nullable QueryContext queryContext, String databaseName,
                                 @Nullable String tablespace,
                                 boolean ifNotExists) throws IOException {
 
@@ -768,7 +756,7 @@ public class GlobalEngine extends AbstractService {
     return true;
   }
 
-  public boolean dropDatabase(Session session, String databaseName, boolean ifExists) {
+  public boolean dropDatabase(QueryContext queryContext, String databaseName, boolean ifExists) {
 
     boolean exists = catalog.existDatabase(databaseName);
     if(!exists) {
@@ -780,7 +768,7 @@ public class GlobalEngine extends AbstractService {
       }
     }
 
-    if (session.getCurrentDatabase().equals(databaseName)) {
+    if (queryContext.getCurrentDatabase().equals(databaseName)) {
       throw new RuntimeException("ERROR: Cannot drop the current open database");
     }
 
@@ -795,7 +783,7 @@ public class GlobalEngine extends AbstractService {
    * @param tableName to be dropped
    * @param purge Remove all data if purge is true.
    */
-  public boolean dropTable(Session session, String tableName, boolean ifExists, boolean purge) {
+  public boolean dropTable(QueryContext queryContext, String tableName, boolean ifExists, boolean purge) {
     CatalogService catalog = context.getCatalog();
 
     String databaseName;
@@ -805,7 +793,7 @@ public class GlobalEngine extends AbstractService {
       databaseName = splitted[0];
       simpleTableName = splitted[1];
     } else {
-      databaseName = session.getCurrentDatabase();
+      databaseName = queryContext.getCurrentDatabase();
       simpleTableName = tableName;
     }
     String qualifiedName = CatalogUtil.buildFQName(databaseName, simpleTableName);

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 97f59ef..7d80a88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -34,6 +34,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
@@ -193,7 +194,8 @@ public class TajoMasterClientService extends AbstractService {
     }
 
     @Override
-    public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException {
+    public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
       try {
         String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue());
         if (value != null) {
@@ -278,9 +280,16 @@ public class TajoMasterClientService extends AbstractService {
 
       try {
         Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        QueryContext queryContext = new QueryContext(conf, session);
+        if (queryContext.getCurrentDatabase() == null) {
+          for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) {
+            System.out.println(e.getKey() + "=" + e.getValue());
+          }
+        }
+
         UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
         try {
-          context.getGlobalEngine().updateQuery(session, request.getQuery(), request.getIsJson());
+          context.getGlobalEngine().updateQuery(queryContext, request.getQuery(), request.getIsJson());
           builder.setResultCode(ResultCode.OK);
           return builder.build();
         } catch (Exception e) {
@@ -539,7 +548,9 @@ public class TajoMasterClientService extends AbstractService {
     public BoolProto createDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
       try {
         Session session = context.getSessionManager().getSession(request.getSessionId().getId());
-        if (context.getGlobalEngine().createDatabase(session, request.getValue(), null, false)) {
+        QueryContext queryContext = new QueryContext(conf, session);
+
+        if (context.getGlobalEngine().createDatabase(queryContext, request.getValue(), null, false)) {
           return BOOL_TRUE;
         } else {
           return BOOL_FALSE;
@@ -567,8 +578,9 @@ public class TajoMasterClientService extends AbstractService {
     public BoolProto dropDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
       try {
         Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        QueryContext queryContext = new QueryContext(conf, session);
 
-        if (context.getGlobalEngine().dropDatabase(session, request.getValue(), false)) {
+        if (context.getGlobalEngine().dropDatabase(queryContext, request.getValue(), false)) {
           return BOOL_TRUE;
         } else {
           return BOOL_FALSE;
@@ -605,6 +617,10 @@ public class TajoMasterClientService extends AbstractService {
           tableName = request.getValue();
         }
 
+        if (databaseName == null) {
+          System.out.println("A");
+        }
+
         if (catalog.existsTable(databaseName, tableName)) {
           return BOOL_TRUE;
         } else {
@@ -672,6 +688,7 @@ public class TajoMasterClientService extends AbstractService {
         throws ServiceException {
       try {
         Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        QueryContext queryContext = new QueryContext(conf, session);
 
         Path path = new Path(request.getPath());
         FileSystem fs = path.getFileSystem(conf);
@@ -689,7 +706,7 @@ public class TajoMasterClientService extends AbstractService {
 
         TableDesc desc;
         try {
-          desc = context.getGlobalEngine().createTableOnPath(session, request.getName(), schema,
+          desc = context.getGlobalEngine().createTableOnPath(queryContext, request.getName(), schema,
               meta, path, true, partitionDesc, false);
         } catch (Exception e) {
           return TableResponse.newBuilder()
@@ -715,7 +732,9 @@ public class TajoMasterClientService extends AbstractService {
     public BoolProto dropTable(RpcController controller, DropTableRequest dropTable) throws ServiceException {
       try {
         Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId());
-        context.getGlobalEngine().dropTable(session, dropTable.getName(), false, dropTable.getPurge());
+        QueryContext queryContext = new QueryContext(conf, session);
+
+        context.getGlobalEngine().dropTable(queryContext, dropTable.getName(), false, dropTable.getPurge());
         return BOOL_TRUE;
       } catch (Throwable t) {
         throw new ServiceException(t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 8bb3dde..8111ef6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.state.*;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.CatalogService;
@@ -38,7 +39,6 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
@@ -636,7 +636,7 @@ public class Query implements EventHandler<QueryEvent> {
         SubQuery lastStage = query.getSubQuery(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
 
-        String nullChar = queryContext.get(ConfVars.CSVFILE_NULL.varname, ConfVars.CSVFILE_NULL.defaultVal);
+        String nullChar = queryContext.get(SessionVars.NULL_CHAR);
         meta.putOption(StorageConstants.CSVFILE_NULL, nullChar);
 
         TableStats stats = lastStage.getResultStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 25af82f..aed69b2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -29,10 +29,12 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoAsyncDispatcher;
@@ -83,6 +85,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private QueryMasterContext queryMasterContext;
 
+  private QueryContext queryContext;
+
   private QueryHeartbeatThread queryHeartbeatThread;
 
   private FinishedQueryMasterTaskCleanThread finishedQueryMasterTaskCleanThread;
@@ -362,12 +366,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
         try {
           queryMasterTask.stop();
-          //if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")
-         //     && !workerContext.isYarnContainerMode()) {
-          if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+          if (!queryContext.getBool(SessionVars.DEBUG_ENABLED)) {
             cleanup(queryId);
           }
-          //}
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
         }
@@ -408,6 +409,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
         queryMasterTask.start();
       }
 
+      queryContext = event.getQueryContext();
+
       synchronized(queryMasterTasks) {
         queryMasterTasks.put(event.getQueryId(), queryMasterTask);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index f52d143..ec975d8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -204,7 +204,6 @@ public class QueryMasterManagerService extends CompositeService
     try {
       QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
           new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      // queryMaster terminated by internal error before task has not done
       if (queryMasterTask != null) {
         queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
       }
@@ -239,7 +238,8 @@ public class QueryMasterManagerService extends CompositeService
       LOG.info("Receive executeQuery request:" + queryId);
       queryMaster.handle(new QueryStartEvent(queryId,
           new Session(request.getSession()),
-          new QueryContext(request.getQueryContext()), request.getExprInJson().getValue(),
+          new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
+              request.getQueryContext()), request.getExprInJson().getValue(),
           request.getLogicalPlanJson().getValue()));
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 071e5d4..5885a1d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -328,8 +328,8 @@ public class QueryMasterTask extends CompositeService {
       LogicalPlanner planner = new LogicalPlanner(catalog);
       LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
-      LogicalPlan plan = planner.createPlan(session, expr);
-      optimizer.optimize(session, plan);
+      LogicalPlan plan = planner.createPlan(queryContext, expr);
+      optimizer.optimize(queryContext, plan);
 
       GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
       hookManager.addHook(new GlobalEngine.InsertHook());

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index fa1ed4c..940170c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -25,12 +25,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
@@ -50,6 +50,7 @@ import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.FetchImpl;
@@ -373,8 +374,7 @@ public class Repartitioner {
     // Getting the desire number of join tasks according to the volumn
     // of a larger table
     int largerIdx = stats[0] >= stats[1] ? 0 : 1;
-    int desireJoinTaskVolumn = subQuery.getContext().getConf().
-        getIntVar(ConfVars.DIST_QUERY_JOIN_TASK_VOLUME);
+    int desireJoinTaskVolumn = subQuery.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE);
 
     // calculate the number of tasks according to the data size
     int mb = (int) Math.ceil((double) stats[largerIdx] / 1048576);
@@ -858,17 +858,17 @@ public class Repartitioner {
 
   // Scattered hash shuffle hashes the key columns and groups the hash keys associated with
   // the same hash key. Then, if the volume of a group is larger
-  // than DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups
-  // according to DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB).
+  // than $DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups
+  // according to $DIST_QUERY_TABLE_PARTITION_VOLUME (default size = 256MB).
   // As a result, each group size always becomes the less than or equal
-  // to DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit.
+  // to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit.
   // It is usually used for writing partitioned tables.
   public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext,
        SubQuery subQuery, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
        String tableName) {
     int i = 0;
-    long splitVolume = ((long) 1048576) * subQuery.getContext().getConf().
-        getIntVar(ConfVars.DIST_QUERY_TABLE_PARTITION_VOLUME); // in bytes
+    long splitVolume = StorageUnit.MB *
+        subQuery.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
 
     long sumNumBytes = 0L;
     Map<Integer, List<FetchImpl>> fetches = new HashMap<Integer, List<FetchImpl>>();

http://git-wip-us.apache.org/repos/asf/tajo/blob/ddfc3f33/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 17efa21..b6fe9da 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -32,10 +32,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
@@ -742,7 +739,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
 
         int taskNum = (int) Math.ceil((double) mb /
-            conf.getIntVar(ConfVars.DIST_QUERY_JOIN_PARTITION_VOLUME));
+            conf.getIntVar(ConfVars.$DIST_QUERY_JOIN_PARTITION_VOLUME));
 
         int totalMem = getClusterTotalMemory(subQuery);
         LOG.info(subQuery.getId() + ", Total memory of cluster is " + totalMem + " MB");
@@ -750,8 +747,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         // determine the number of task
         taskNum = Math.min(taskNum, slots);
 
-        if (conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM) > 0) {
-          taskNum = conf.getIntVar(ConfVars.TESTCASE_MIN_TASK_NUM);
+        if (conf.getIntVar(ConfVars.$TEST_MIN_TASK_NUM) > 0) {
+          taskNum = conf.getIntVar(ConfVars.$TEST_MIN_TASK_NUM);
           LOG.warn("!!!!! TESTCASE MODE !!!!!");
         }
 
@@ -795,7 +792,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
           LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB");
           // determine the number of task
           int taskNumBySize = (int) Math.ceil((double) mb /
-              conf.getIntVar(ConfVars.DIST_QUERY_GROUPBY_PARTITION_VOLUME));
+              conf.getIntVar(ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME));
 
           int totalMem = getClusterTotalMemory(subQuery);
 
@@ -1110,7 +1107,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     stopScheduler();
     releaseContainers();
 
-    if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) {
+    if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) {
       List<ExecutionBlock> childs = getMasterPlan().getChilds(getId());
       List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList();
       for (ExecutionBlock executionBlock :  childs){