You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:14:50 UTC

[3/5] tajo git commit: TAJO-838: Improve query planner to utilize index. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
index c6466f5..9187a69 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
@@ -33,15 +33,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
@@ -297,43 +289,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
     
     return tuples;
   }
-  
+
   private List<Tuple> getIndexes(Schema outSchema) {
-    List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+    List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes();
     List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
     List<Column> columns = outSchema.getColumns();
     Tuple aTuple;
-    
-    for (IndexProto index: indexList) {
+
+    for (IndexDescProto index: indexList) {
       aTuple = new VTuple(outSchema.size());
-      
+
       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
         Column column = columns.get(fieldId);
-        
+
         if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId()));
         } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid()));
         } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
           aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
-        } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
-        } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
-        } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
-        } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
-        } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
-        } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+        } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name()));
+        } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath()));
         }
       }
-      
+
       tuples.add(aTuple);
     }
-    
+
     return tuples;
   }
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 249d335..f05f54b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -36,6 +36,7 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryContext;
@@ -57,6 +58,7 @@ import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.IPCUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
@@ -143,19 +145,17 @@ public class TajoMasterClientService extends AbstractService {
         String sessionId =
             context.getSessionManager().createSession(request.getUsername(), databaseName);
         CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
-        builder.setResultCode(ResultCode.OK);
+        builder.setResult(IPCUtil.buildOkRequestResult());
         builder.setSessionId(TajoIdProtos.SessionIdProto.newBuilder().setId(sessionId).build());
         builder.setSessionVars(ProtoUtil.convertFromMap(context.getSessionManager().getAllVariables(sessionId)));
         return builder.build();
       } catch (NoSuchDatabaseException nsde) {
         CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
-        builder.setResultCode(ResultCode.ERROR);
-        builder.setMessage(nsde.getMessage());
+        builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, nsde.getMessage(), null));
         return builder.build();
       } catch (InvalidSessionException e) {
         CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
-        builder.setResultCode(ResultCode.ERROR);
-        builder.setMessage(e.getMessage());
+        builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null));
         return builder.build();
       }
     }
@@ -173,15 +173,14 @@ public class TajoMasterClientService extends AbstractService {
 
     public SessionUpdateResponse buildSessionUpdateOnSuccess(Map<String, String> variables) {
       SessionUpdateResponse.Builder builder = SessionUpdateResponse.newBuilder();
-      builder.setResultCode(ResultCode.OK);
+      builder.setResult(IPCUtil.buildOkRequestResult());
       builder.setSessionVars(new KeyValueSet(variables).getProto());
       return builder.build();
     }
 
     public SessionUpdateResponse buildSessionUpdateOnError(String message) {
       SessionUpdateResponse.Builder builder = SessionUpdateResponse.newBuilder();
-      builder.setResultCode(ResultCode.ERROR);
-      builder.setMessage(message);
+      builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, message, null));
       return builder.build();
     }
 
@@ -288,12 +287,8 @@ public class TajoMasterClientService extends AbstractService {
         responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
         responseBuilder.setIsForwarded(true);
         responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME));
-        responseBuilder.setResultCode(ResultCode.ERROR);
-        if (e.getMessage() != null) {
-          responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e));
-        } else {
-          responseBuilder.setErrorMessage("Internal Error");
-        }
+        responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+            e.getMessage() == null ? "Internal Error" : ExceptionUtils.getStackTrace(e), null));
         return responseBuilder.build();
       }
     }
@@ -304,23 +299,15 @@ public class TajoMasterClientService extends AbstractService {
       try {
         Session session = context.getSessionManager().getSession(request.getSessionId().getId());
         QueryContext queryContext = new QueryContext(conf, session);
-        if (queryContext.getCurrentDatabase() == null) {
-          for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) {
-            System.out.println(e.getKey() + "=" + e.getValue());
-          }
-        }
 
-        UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
+        UpdateQueryResponse.Builder responseBuilder = UpdateQueryResponse.newBuilder();
         try {
           context.getGlobalEngine().updateQuery(queryContext, request.getQuery(), request.getIsJson());
-          builder.setResultCode(ResultCode.OK);
-          return builder.build();
+          return responseBuilder.setResult(IPCUtil.buildOkRequestResult()).build();
         } catch (Exception e) {
-          builder.setResultCode(ResultCode.ERROR);
-          if (e.getMessage() == null) {
-            builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
-          }
-          return builder.build();
+          responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+              e.getMessage() == null ? ExceptionUtils.getStackTrace(e) : null, null));
+          return responseBuilder.build();
         }
       } catch (Throwable t) {
         throw new ServiceException(t);
@@ -460,7 +447,7 @@ public class TajoMasterClientService extends AbstractService {
         builder.setQueryId(request.getQueryId());
 
         if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-          builder.setResultCode(ResultCode.OK);
+          builder.setResult(IPCUtil.buildOkRequestResult());
           builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
         } else {
           QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
@@ -474,7 +461,7 @@ public class TajoMasterClientService extends AbstractService {
           }
 
           if (queryInfo != null) {
-            builder.setResultCode(ResultCode.OK);
+            builder.setResult(IPCUtil.buildOkRequestResult());
             builder.setState(queryInfo.getQueryState());
 
             boolean isCreateTable = queryInfo.getQueryContext().isCreateTable();
@@ -495,11 +482,11 @@ public class TajoMasterClientService extends AbstractService {
           } else {
             Session session = context.getSessionManager().getSession(request.getSessionId().getId());
             if (session.getNonForwardQueryResultScanner(queryId) != null) {
-              builder.setResultCode(ResultCode.OK);
+              builder.setResult(IPCUtil.buildOkRequestResult());
               builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
             } else {
-              builder.setResultCode(ResultCode.ERROR);
-              builder.setErrorMessage("No such query: " + queryId.toString());
+              builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+                  "No such query: " + queryId.toString(), null));
             }
           }
         }
@@ -531,17 +518,16 @@ public class TajoMasterClientService extends AbstractService {
         resultSetBuilder.addAllSerializedTuples(rows);
 
         builder.setResultSet(resultSetBuilder.build());
-        builder.setResultCode(ResultCode.OK);
+        builder.setResult(IPCUtil.buildOkRequestResult());
 
         LOG.info("Send result to client for " +
             request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows");
 
       } catch (Throwable t) {
         LOG.error(t.getMessage(), t);
-        builder.setResultCode(ResultCode.ERROR);
         String errorMessage = t.getMessage() == null ? t.getClass().getName() : t.getMessage();
-        builder.setErrorMessage(errorMessage);
-        builder.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(t));
+        builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+            errorMessage, org.apache.hadoop.util.StringUtils.stringifyException(t)));
       }
       return builder.build();
     }
@@ -581,11 +567,11 @@ public class TajoMasterClientService extends AbstractService {
         if (queryInfo != null) {
           builder.setQueryInfo(queryInfo.getProto());
         }
-        builder.setResultCode(ResultCode.OK);
+        builder.setResult(IPCUtil.buildOkRequestResult());
       } catch (Throwable t) {
         LOG.warn(t.getMessage(), t);
-        builder.setResultCode(ResultCode.ERROR);
-        builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t));
+        builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+            org.apache.hadoop.util.StringUtils.stringifyException(t), null));
       }
 
       return builder.build();
@@ -779,13 +765,13 @@ public class TajoMasterClientService extends AbstractService {
 
         if (catalog.existsTable(databaseName, tableName)) {
           return TableResponse.newBuilder()
-              .setResultCode(ResultCode.OK)
+              .setResult(IPCUtil.buildOkRequestResult())
               .setTableDesc(catalog.getTableDesc(databaseName, tableName).getProto())
               .build();
         } else {
           return TableResponse.newBuilder()
-              .setResultCode(ResultCode.ERROR)
-              .setErrorMessage("ERROR: no such a table: " + request.getTableName())
+              .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+                  "ERROR: no such a table: " + request.getTableName(), null))
               .build();
         }
       } catch (Throwable t) {
@@ -821,21 +807,21 @@ public class TajoMasterClientService extends AbstractService {
               meta, path, true, partitionDesc, false);
         } catch (Exception e) {
           return TableResponse.newBuilder()
-              .setResultCode(ResultCode.ERROR)
-              .setErrorMessage(e.getMessage()).build();
+              .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null))
+              .build();
         }
 
         return TableResponse.newBuilder()
-            .setResultCode(ResultCode.OK)
+            .setResult(IPCUtil.buildOkRequestResult())
             .setTableDesc(desc.getProto()).build();
       } catch (InvalidSessionException ise) {
         return TableResponse.newBuilder()
-            .setResultCode(ResultCode.ERROR)
-            .setErrorMessage(ise.getMessage()).build();
+            .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, ise.getMessage(), null))
+            .build();
       } catch (IOException ioe) {
         return TableResponse.newBuilder()
-            .setResultCode(ResultCode.ERROR)
-            .setErrorMessage(ioe.getMessage()).build();
+            .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, ioe.getMessage(), null))
+            .build();
       }
     }
 
@@ -875,12 +861,184 @@ public class TajoMasterClientService extends AbstractService {
           }
         }
         return FunctionResponse.newBuilder()
-            .setResultCode(ResultCode.OK)
+            .setResult(IPCUtil.buildOkRequestResult())
             .addAllFunctions(functionProtos)
             .build();
       } catch (Throwable t) {
         throw new ServiceException(t);
       }
     }
+
+    @Override
+    public IndexDescProto getIndexWithName(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String indexName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          indexName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          indexName = request.getValue();
+        }
+        return catalog.getIndexByName(databaseName, indexName).getProto();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto existIndexWithName(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String indexName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          indexName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          indexName = request.getValue();
+        }
+        return catalog.existIndexByName(databaseName, indexName) ?
+            ProtoUtil.TRUE : ProtoUtil.FALSE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetIndexesResponse getIndexesForTable(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getValue();
+        }
+
+        GetIndexesResponse.Builder builder = GetIndexesResponse.newBuilder();
+        for (IndexDesc index : catalog.getAllIndexesByTable(databaseName, tableName)) {
+          builder.addIndexes(index.getProto());
+        }
+        builder.setResult(IPCUtil.buildOkRequestResult());
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto existIndexesForTable(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getValue();
+        }
+        return catalog.existIndexesByTable(databaseName, tableName) ?
+            ProtoUtil.TRUE : ProtoUtil.FALSE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetIndexWithColumnsResponse getIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getTableName())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getTableName();
+        }
+        String[] columnNames = new String[request.getColumnNamesCount()];
+        columnNames = request.getColumnNamesList().toArray(columnNames);
+
+        GetIndexWithColumnsResponse.Builder builder = GetIndexWithColumnsResponse.newBuilder();
+        builder.setResult(IPCUtil.buildOkRequestResult());
+        builder.setIndexDesc(catalog.getIndexByColumnNames(databaseName, tableName, columnNames).getProto());
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto existIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getTableName())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getTableName();
+        }
+        String[] columnNames = new String[request.getColumnNamesCount()];
+        columnNames = request.getColumnNamesList().toArray(columnNames);
+        return catalog.existIndexByColumnNames(databaseName, tableName, columnNames) ?
+            ProtoUtil.TRUE : ProtoUtil.FALSE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto dropIndex(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String indexName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          indexName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          indexName = request.getValue();
+        }
+        return catalog.dropIndex(databaseName, indexName) ?
+            ProtoUtil.TRUE : ProtoUtil.FALSE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index acbaa01..5b12438 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -68,45 +68,88 @@ public class DDLExecutor {
 
     switch (root.getType()) {
 
-    case ALTER_TABLESPACE:
-      AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
-      alterTablespace(context, queryContext, alterTablespace);
-      return true;
-
-
-    case CREATE_DATABASE:
-      CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
-      createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
-      return true;
-    case DROP_DATABASE:
-      DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
-      dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
-      return true;
-
-
-    case CREATE_TABLE:
-      CreateTableNode createTable = (CreateTableNode) root;
-      createTable(queryContext, createTable, createTable.isIfNotExists());
-      return true;
-    case DROP_TABLE:
-      DropTableNode dropTable = (DropTableNode) root;
-      dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
-      return true;
-    case TRUNCATE_TABLE:
-      TruncateTableNode truncateTable = (TruncateTableNode) root;
-      truncateTable(queryContext, truncateTable);
-      return true;
-
-    case ALTER_TABLE:
-      AlterTableNode alterTable = (AlterTableNode) root;
-      alterTable(context, queryContext, alterTable);
-      return true;
+      case ALTER_TABLESPACE:
+        AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+        alterTablespace(context, queryContext, alterTablespace);
+        return true;
+
+
+      case CREATE_DATABASE:
+        CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+        createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+        return true;
+      case DROP_DATABASE:
+        DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+        dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+        return true;
+
+
+      case CREATE_TABLE:
+        CreateTableNode createTable = (CreateTableNode) root;
+        createTable(queryContext, createTable, createTable.isIfNotExists());
+        return true;
+      case DROP_TABLE:
+        DropTableNode dropTable = (DropTableNode) root;
+        dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+        return true;
+      case TRUNCATE_TABLE:
+        TruncateTableNode truncateTable = (TruncateTableNode) root;
+        truncateTable(queryContext, truncateTable);
+        return true;
+
+      case ALTER_TABLE:
+        AlterTableNode alterTable = (AlterTableNode) root;
+        alterTable(context, queryContext, alterTable);
+        return true;
+
+      case CREATE_INDEX:
+        // The catalog information for the created index is automatically updated when the query is successfully finished.
+        // See the Query.CreateIndexHook class.
+        return true;
+
+      case DROP_INDEX:
+        DropIndexNode dropIndexNode = (DropIndexNode) root;
+        dropIndex(queryContext, dropIndexNode);
+        return true;
 
     default:
       throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
     }
   }
 
+  public void dropIndex(final QueryContext queryContext, final DropIndexNode dropIndexNode) {
+    String databaseName, simpleIndexName;
+    if (CatalogUtil.isFQTableName(dropIndexNode.getIndexName())) {
+      String [] splits = CatalogUtil.splitFQTableName(dropIndexNode.getIndexName());
+      databaseName = splits[0];
+      simpleIndexName = splits[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleIndexName = dropIndexNode.getIndexName();
+    }
+
+    if (!catalog.existIndexByName(databaseName, simpleIndexName)) {
+      throw new NoSuchIndexException(simpleIndexName);
+    }
+
+    IndexDesc desc = catalog.getIndexByName(databaseName, simpleIndexName);
+
+    if (!catalog.dropIndex(databaseName, simpleIndexName)) {
+      LOG.info("Cannot drop index \"" + simpleIndexName + "\".");
+      throw new CatalogException("Cannot drop index \"" + simpleIndexName + "\".");
+    }
+
+    Path indexPath = new Path(desc.getIndexPath());
+    try {
+      FileSystem fs = indexPath.getFileSystem(context.getConf());
+      fs.delete(indexPath, true);
+    } catch (IOException e) {
+      throw new InternalError(e.getMessage());
+    }
+
+    LOG.info("Index " + simpleIndexName + " is dropped.");
+  }
+
   /**
    * Alter a given table
    */

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2242445..26476a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -29,8 +29,10 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.exception.AlreadyExistsIndexException;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
@@ -40,6 +42,7 @@ import org.apache.tajo.engine.planner.physical.EvalExprExec;
 import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
 import org.apache.tajo.master.NonForwardQueryResultFileScanner;
 import org.apache.tajo.master.NonForwardQueryResultScanner;
@@ -58,6 +61,7 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.util.IPCUtil;
 import org.apache.tajo.util.ProtoUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -98,10 +102,18 @@ public class QueryExecutor {
 
     } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
       context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
-      ddlExecutor.execute(queryContext, plan);
-      response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      response.setResultCode(ClientProtos.ResultCode.OK);
 
+      if (PlannerUtil.isDistExecDDL(rootNode)) {
+        if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) {
+          checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild());
+        }
+        executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
+      } else {
+        response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+        response.setResult(IPCUtil.buildOkRequestResult());
+      }
+
+      ddlExecutor.execute(queryContext, plan);
 
     } else if (plan.isExplain()) { // explain query
       execExplain(plan, response);
@@ -142,8 +154,8 @@ public class QueryExecutor {
         session.selectDatabase(setSessionNode.getValue());
       } else {
         response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-        response.setResultCode(ClientProtos.ResultCode.ERROR);
-        response.setErrorMessage("database \"" + databaseName + "\" does not exists.");
+        response.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+            "database \"" + databaseName + "\" does not exists.", null));
       }
 
       // others
@@ -157,7 +169,7 @@ public class QueryExecutor {
 
     context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
     response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-    response.setResultCode(ClientProtos.ResultCode.OK);
+    response.setResult(IPCUtil.buildOkRequestResult());
   }
 
   public void execExplain(LogicalPlan plan, SubmitQueryResponse.Builder response) throws IOException {
@@ -183,7 +195,7 @@ public class QueryExecutor {
 
     response.setResultSet(serializedResBuilder.build());
     response.setMaxRowNum(lines.length);
-    response.setResultCode(ClientProtos.ResultCode.OK);
+    response.setResult(IPCUtil.buildOkRequestResult());
     response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
   }
 
@@ -205,7 +217,7 @@ public class QueryExecutor {
     response.setQueryId(queryId.getProto());
     response.setMaxRowNum(maxRow);
     response.setTableDesc(queryResultScanner.getTableDesc().getProto());
-    response.setResultCode(ClientProtos.ResultCode.OK);
+    response.setResult(IPCUtil.buildOkRequestResult());
   }
 
   public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
@@ -235,7 +247,7 @@ public class QueryExecutor {
     response.setQueryId(queryId.getProto());
     response.setMaxRowNum(maxRow);
     response.setTableDesc(desc.getProto());
-    response.setResultCode(ClientProtos.ResultCode.OK);
+    response.setResult(IPCUtil.buildOkRequestResult());
   }
 
   public void execNonFromQuery(QueryContext queryContext, Session session, String query,
@@ -267,7 +279,7 @@ public class QueryExecutor {
       responseBuilder.setResultSet(serializedResBuilder);
       responseBuilder.setMaxRowNum(1);
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      responseBuilder.setResult(IPCUtil.buildOkRequestResult());
     }
   }
 
@@ -369,7 +381,7 @@ public class QueryExecutor {
     // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
     responseBuilder.setMaxRowNum(-1);
     responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-    responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+    responseBuilder.setResult(IPCUtil.buildOkRequestResult());
   }
 
   public void executeDistributedQuery(QueryContext queryContext, Session session,
@@ -398,13 +410,13 @@ public class QueryExecutor {
 
     if(queryInfo == null) {
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
-      responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+      responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+          "Fail starting QueryMaster.", null));
       LOG.error("Fail starting QueryMaster: " + sql);
     } else {
       responseBuilder.setIsForwarded(true);
       responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
-      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      responseBuilder.setResult(IPCUtil.buildOkRequestResult());
       if(queryInfo.getQueryMasterHost() != null) {
         responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
       }
@@ -413,4 +425,23 @@ public class QueryExecutor {
           " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
     }
   }
+
+  private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
+      throws IOException {
+    String databaseName, simpleIndexName, qualifiedIndexName;
+    if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+      String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+      databaseName = splits[0];
+      simpleIndexName = splits[1];
+      qualifiedIndexName = createIndexNode.getIndexName();
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleIndexName = createIndexNode.getIndexName();
+      qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+    }
+
+    if (catalog.existIndexByName(databaseName, simpleIndexName)) {
+      throw new AlreadyExistsIndexException(qualifiedIndexName);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index a626df1..88ecebc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -32,6 +32,8 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
@@ -472,6 +474,7 @@ public class Query implements EventHandler<QueryEvent> {
         hookList.add(new MaterializedResultHook());
         hookList.add(new CreateTableHook());
         hookList.add(new InsertTableHook());
+        hookList.add(new CreateIndexHook());
       }
 
       public void execute(QueryContext queryContext, Query query,
@@ -485,6 +488,48 @@ public class Query implements EventHandler<QueryEvent> {
       }
     }
 
+    private class CreateIndexHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+        Stage lastStage = query.getStage(finalExecBlockId);
+        return  lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        Stage lastStage = query.getStage(finalExecBlockId);
+
+        CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan();
+        String databaseName, simpleIndexName, qualifiedIndexName;
+        if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+          String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+          databaseName = splits[0];
+          simpleIndexName = splits[1];
+          qualifiedIndexName = createIndexNode.getIndexName();
+        } else {
+          databaseName = queryContext.getCurrentDatabase();
+          simpleIndexName = createIndexNode.getIndexName();
+          qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+        }
+        ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
+        if (scanNode == null) {
+          throw new IOException("Cannot find the table of the relation");
+        }
+        IndexDesc indexDesc = new IndexDesc(databaseName, scanNode.getTableName(),
+            simpleIndexName, createIndexNode.getIndexPath(),
+            createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
+            createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
+        if (catalog.createIndex(indexDesc)) {
+          LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
+        } else {
+          LOG.info("Index creation " + qualifiedIndexName + " is failed.");
+          throw new CatalogException("Cannot create index \"" + qualifiedIndexName + "\".");
+        }
+      }
+    }
+
     private class MaterializedResultHook implements QueryHook {
 
       @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 9c789a5..a125415 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -38,6 +38,12 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
@@ -48,15 +54,9 @@ import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -347,9 +347,10 @@ public class QueryMasterTask extends CompositeService {
         LOG.warn("Query already started");
         return;
       }
+      LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
       CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
       LogicalPlanner planner = new LogicalPlanner(catalog);
-      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
+      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
       jsonExpr = null; // remove the possible OOM
       plan = planner.createPlan(queryContext, expr);
@@ -393,6 +394,14 @@ public class QueryMasterTask extends CompositeService {
             tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
           }
         }
+
+        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN);
+        if (scanNodes != null) {
+          for (LogicalNode eachScanNode : scanNodes) {
+            ScanNode scanNode = (ScanNode) eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+          }
+        }
       }
       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
       queryMasterContext.getGlobalPlanner().build(masterPlan);

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java
new file mode 100644
index 0000000..dcffe62
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.ipc.ClientProtos.RequestResult;
+import org.apache.tajo.ipc.ClientProtos.ResultCode;
+
+public class IPCUtil {
+
+  public static RequestResult buildOkRequestResult() {
+    return buildRequestResult(ResultCode.OK, null, null);
+  }
+
+  public static RequestResult buildRequestResult(ResultCode code,
+                                                 @Nullable String errorMessage,
+                                                 @Nullable String errorTrace) {
+    RequestResult.Builder builder = RequestResult.newBuilder();
+    builder.setResultCode(code);
+    if (errorMessage != null) {
+      builder.setErrorMessage(errorMessage);
+    }
+    if (errorTrace != null) {
+      builder.setErrorTrace(errorTrace);
+    }
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
deleted file mode 100644
index 3147bb6..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util;
-
-import com.google.gson.Gson;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.logical.IndexScanNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-public class IndexUtil {
-  public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
-    StringBuilder builder = new StringBuilder(); 
-    builder.append(fragment.getPath().getName() + "_");
-    builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_");
-    for(int i = 0 ; i < keys.length ; i ++) {
-      builder.append(keys[i].getSortKey().getSimpleName()+"_");
-    }
-    builder.append("_index");
-    return builder.toString();
-       
-  }
-  
-  public static String getIndexName(String indexName , SortSpec[] keys) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(indexName + "_");
-    for(int i = 0 ; i < keys.length ; i ++) {
-      builder.append(keys[i].getSortKey().getSimpleName() + "_");
-    }
-    return builder.toString();
-  }
-  
-  public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
-      Iterator<Entry<String, String>> iter ) {
-   
-    EvalNode qual = scanNode.getQual();
-    Gson gson = CoreGsonHelper.getInstance();
-    
-    FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
-    qual.preOrder(nodeFinder);
-    LinkedList<BinaryEval> nodeList = nodeFinder.getNodeList();
-    
-    int maxSize = Integer.MIN_VALUE;
-    SortSpec[] maxIndex = null;
-    
-    String json;
-    while(iter.hasNext()) {
-      Entry<String , String> entry = iter.next();
-      json = entry.getValue();
-      SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
-      if(sortKey.length > nodeList.size()) {
-        /* If the number of the sort key is greater than where condition, 
-         * this index cannot be used
-         * */
-        continue; 
-      } else {
-        boolean[] equal = new boolean[sortKey.length];
-        for(int i = 0 ; i < sortKey.length ; i ++) {
-          for(int j = 0 ; j < nodeList.size() ; j ++) {
-            Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
-            if(col.equals(sortKey[i].getSortKey())) {
-              equal[i] = true;
-            }
-          }
-        }
-        boolean chk = true;
-        for(int i = 0 ; i < equal.length ; i ++) {
-          chk = chk && equal[i];
-        }
-        if(chk) {
-          if(maxSize < sortKey.length) {
-            maxSize = sortKey.length;
-            maxIndex = sortKey;
-          }
-        }
-      }
-    }
-    if(maxIndex == null) {
-      return null;
-    } else {
-      Schema keySchema = new Schema();
-      for(int i = 0 ; i < maxIndex.length ; i ++ ) {
-        keySchema.addColumn(maxIndex[i].getSortKey());
-      }
-      Datum[] datum = new Datum[nodeList.size()];
-      for(int i = 0 ; i < nodeList.size() ; i ++ ) {
-        datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
-      }
-      
-      return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
-    }
-
-  }
-  
-  
-  private static class FieldAndValueFinder implements EvalNodeVisitor {
-    private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>();
-    
-    public LinkedList<BinaryEval> getNodeList () {
-      return this.nodeList;
-    }
-    
-    @Override
-    public void visit(EvalNode node) {
-      BinaryEval binaryEval = (BinaryEval) node;
-      switch(node.getType()) {
-      case AND:
-        break;
-      case EQUAL:
-        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
-          && binaryEval.getRightExpr().getType() == EvalType.CONST ) {
-          nodeList.add(binaryEval);
-        }
-        break;
-      case IS_NULL:
-        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
-          && binaryEval.getRightExpr().getType() == EvalType.CONST) {
-          nodeList.add(binaryEval);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index da25fe6..cd9e6ef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -304,7 +304,7 @@ public class QueryExecutorServlet extends HttpServlet {
           LOG.error("Internal Error: SubmissionResponse is NULL");
           error = new Exception("Internal Error: SubmissionResponse is NULL");
 
-        } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+        } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
           if (response.getIsForwarded()) {
             queryId = new QueryId(response.getQueryId());
             getQueryResult(queryId);
@@ -316,9 +316,9 @@ public class QueryExecutorServlet extends HttpServlet {
 
             progress.set(100);
           }
-        } else if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
-          if (response.hasErrorMessage()) {
-            StringBuffer errorMessage = new StringBuffer(response.getErrorMessage());
+        } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
+          if (response.getResult().hasErrorMessage()) {
+            StringBuffer errorMessage = new StringBuffer(response.getResult().getErrorMessage());
             String modifiedMessage;
 
             if (errorMessage.length() > 200) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 1c83110..6a13898 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -25,10 +25,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
 import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
+import org.apache.tajo.ipc.ClientProtos.RequestResult;
 import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -129,14 +132,32 @@ public class TajoWorkerClientService extends AbstractService {
         if (queryHistory != null) {
           builder.setQueryHistory(queryHistory.getProto());
         }
-        builder.setResultCode(ResultCode.OK);
+        builder.setResult(buildOkRequestResult());
       } catch (Throwable t) {
         LOG.warn(t.getMessage(), t);
-        builder.setResultCode(ResultCode.ERROR);
-        builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t));
+        builder.setResult(buildRequestResult(ResultCode.ERROR,
+            StringUtils.stringifyException(t), null));
       }
 
       return builder.build();
     }
+
+    private RequestResult buildOkRequestResult() {
+      return buildRequestResult(ResultCode.OK, null, null);
+    }
+
+    private RequestResult buildRequestResult(ResultCode code,
+                                                    @Nullable String errorMessage,
+                                                    @Nullable String errorTrace) {
+      RequestResult.Builder builder = RequestResult.newBuilder();
+      builder.setResultCode(code);
+      if (errorMessage != null) {
+        builder.setErrorMessage(errorMessage);
+      }
+      if (errorTrace != null) {
+        builder.setErrorTrace(errorTrace);
+      }
+      return builder.build();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 5f9c6ac..b784c64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -125,21 +125,9 @@ public class Task {
     this.inputStats = new TableStats();
 
     plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan());
-    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
-    if (scanNode != null) {
-      for (LogicalNode node : scanNode) {
-        ScanNode scan = (ScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
-
-    LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
-    if (partitionScanNode != null) {
-      for (LogicalNode node : partitionScanNode) {
-        PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
+    updateDescsForScanNodes(NodeType.SCAN);
+    updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
+    updateDescsForScanNodes(NodeType.INDEX_SCAN);
 
     interQuery = request.getProto().getInterQuery();
     if (interQuery) {
@@ -181,6 +169,17 @@ public class Task {
     LOG.info("==================================");
   }
 
+  private void updateDescsForScanNodes(NodeType nodeType) {
+    assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
+    LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
+    if (scanNodes != null) {
+      for (LogicalNode node : scanNodes) {
+        ScanNode scanNode = (ScanNode) node;
+        descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+      }
+    }
+  }
+
   public void init() throws IOException {
     if (context.getState() == TaskAttemptState.TA_PENDING) {
       // initialize a task temporal dir

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index dfc8a9b..6c0af89 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -379,12 +379,15 @@ public class TaskAttemptContext {
     return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
   }
 
-  public long getUniqueKeyFromFragments() {
-    List<FragmentProto> totalFragments = new ArrayList<FragmentProto>();
-    for (List<FragmentProto> eachFragments : fragmentMap.values()) {
-      totalFragments.addAll(eachFragments);
+  public String getUniqueKeyFromFragments() {
+    StringBuilder sb = new StringBuilder();
+    for (List<FragmentProto> fragments : fragmentMap.values()) {
+      for (FragmentProto f : fragments) {
+        FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f);
+        sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
+      }
     }
-    return Objects.hashCode(totalFragments.toArray(new FragmentProto[totalFragments.size()]));
+    return sb.toString();
   }
 
   public int hashCode() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 4e4b710..615dbb9 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -36,11 +36,11 @@ import org.apache.tajo.engine.codegen.TajoClassLoader;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.serder.EvalNodeDeserializer;
 import org.apache.tajo.plan.serder.EvalNodeSerializer;
-import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
@@ -98,7 +98,7 @@ public class ExprTestBase {
     analyzer = new SQLAnalyzer();
     preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
     planner = new LogicalPlanner(cat);
-    optimizer = new LogicalOptimizer(util.getConfiguration());
+    optimizer = new LogicalOptimizer(util.getConfiguration(), cat);
     annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index c9b52fd..3122b25 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -104,9 +104,9 @@ public class TestLogicalOptimizer {
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer(util.getConfiguration());
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
+    optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 996d736..0bf4602 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -497,7 +497,7 @@ public class TestLogicalPlanner {
     Schema expected = tpch.getOutSchema("q2");
     assertSchema(expected, node.getOutSchema());
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.JOIN);
@@ -536,7 +536,7 @@ public class TestLogicalPlanner {
     LogicalNode node = plan.getRootBlock().getRoot();
     testJsonSerDerObject(node);
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -577,7 +577,7 @@ public class TestLogicalPlanner {
     LogicalNode node = plan.getRootBlock().getRoot();
     testJsonSerDerObject(node);
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -624,7 +624,7 @@ public class TestLogicalPlanner {
     LogicalNode node = plan.getRootBlock().getRoot();
     testJsonSerDerObject(node);
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     Map<BinaryEval, Boolean> scanMap = TUtil.newHashMap();

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
index 3803c7a..9f20776 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -180,7 +180,7 @@ public class TestBroadcastJoinPlan {
         "join small2 on small1_id = small2_id";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -241,7 +241,7 @@ public class TestBroadcastJoinPlan {
         "join small3 on small1_id = small3_id";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -305,7 +305,7 @@ public class TestBroadcastJoinPlan {
         "join large2 on large1_id = large2_id ";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -333,7 +333,7 @@ public class TestBroadcastJoinPlan {
         "join small2 on large2_id = small2_id";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -383,7 +383,7 @@ public class TestBroadcastJoinPlan {
         "join small2 on a.small1_id = small2_id";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -424,7 +424,7 @@ public class TestBroadcastJoinPlan {
         "join (select * from small1) a on large1_id = a.small1_id";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -480,7 +480,7 @@ public class TestBroadcastJoinPlan {
         "left outer join large2 on small1_id = large2_id ";
 
         LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -534,7 +534,7 @@ public class TestBroadcastJoinPlan {
         "left outer join small3 on large1_id = small3_id ";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -617,7 +617,7 @@ public class TestBroadcastJoinPlan {
         "left outer join small3 on large3_id = small3_id ";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -700,7 +700,7 @@ public class TestBroadcastJoinPlan {
         "left outer join small3 on small1_id = small3_id ";
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -759,7 +759,7 @@ public class TestBroadcastJoinPlan {
         "left outer join small3 on small1_id = small3_id " ;
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -812,7 +812,7 @@ public class TestBroadcastJoinPlan {
         "left outer join small3 on small1_id = small3_id " ;
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -904,7 +904,7 @@ public class TestBroadcastJoinPlan {
         "left outer join small3 on small3_id = large1_id " ;
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr =  analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 
@@ -969,7 +969,7 @@ public class TestBroadcastJoinPlan {
         "left outer join small2 on large1_id = small2_id " ;
 
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
     Expr expr = analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
deleted file mode 100644
index c897461..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.Stack;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-
-public class TestBSTIndexExec {
-
-  private TajoConf conf;
-  private Path idxPath;
-  private CatalogService catalog;
-  private SQLAnalyzer analyzer;
-  private LogicalPlanner planner;
-  private LogicalOptimizer optimizer;
-  private FileStorageManager sm;
-  private Schema idxSchema;
-  private BaseTupleComparator comp;
-  private BSTIndex.BSTIndexWriter writer;
-  private HashMap<Integer , Integer> randomValues ;
-  private int rndKey = -1;
-  private FileSystem fs;
-  private TableMeta meta;
-  private Path tablePath;
-
-  private Random rnd = new Random(System.currentTimeMillis());
-
-  private TajoTestingCluster util;
-
-  @Before
-  public void setup() throws Exception {
-    this.randomValues = new HashMap<Integer, Integer>();
-    this.conf = new TajoConf();
-    util = new TajoTestingCluster();
-    util.startCatalogCluster();
-    catalog = util.getMiniCatalogCluster().getCatalog();
-
-    Path workDir = CommonTestingUtil.getTestDir();
-    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
-    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir);
-
-    idxPath = new Path(workDir, "test.idx");
-
-    Schema schema = new Schema();
-    schema.addColumn("managerid", Type.INT4);
-    schema.addColumn("empid", Type.INT4);
-    schema.addColumn("deptname", Type.TEXT);
-
-    this.idxSchema = new Schema();
-    idxSchema.addColumn("managerid", Type.INT4);
-    SortSpec[] sortKeys = new SortSpec[1];
-    sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
-    this.comp = new BaseTupleComparator(idxSchema, sortKeys);
-
-    this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
-        BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);
-    writer.setLoadNum(100);
-    writer.open();
-    long offset;
-
-    meta = CatalogUtil.newTableMeta(StoreType.CSV);
-    tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv");
-    fs = tablePath.getFileSystem(conf);
-    fs.mkdirs(tablePath.getParent());
-
-    FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath);
-    appender.init();
-    Tuple tuple = new VTuple(schema.size());
-    for (int i = 0; i < 10000; i++) {
-      
-      Tuple key = new VTuple(this.idxSchema.size());
-      int rndKey = rnd.nextInt(250);
-      if(this.randomValues.containsKey(rndKey)) {
-        int t = this.randomValues.remove(rndKey) + 1;
-        this.randomValues.put(rndKey, t);
-      } else {
-        this.randomValues.put(rndKey, 1);
-      }
-      
-      key.put(new Datum[] { DatumFactory.createInt4(rndKey) });
-      tuple.put(new Datum[] { DatumFactory.createInt4(rndKey),
-          DatumFactory.createInt4(rnd.nextInt(10)),
-          DatumFactory.createText("dept_" + rnd.nextInt(10)) });
-      offset = appender.getOffset();
-      appender.addTuple(tuple);
-      writer.write(key, offset);
-    }
-    appender.flush();
-    appender.close();
-    writer.close();
-
-    TableDesc desc = new TableDesc(
-        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta,
-        sm.getTablePath("employee").toUri());
-    catalog.createTable(desc);
-
-    analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer(conf);
-  }
-
-  @After
-  public void tearDown() {
-    util.shutdownCatalogCluster();
-  }
-
-  @Test
-  public void testEqual() throws Exception {
-    this.rndKey = rnd.nextInt(250);
-    final String QUERY = "select * from employee where managerId = " + rndKey;
-    
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
-    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
-    Expr expr = analyzer.parse(QUERY);
-    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    TmpPlanner phyPlanner = new TmpPlanner(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
-    int tupleCount = this.randomValues.get(rndKey);
-    int counter = 0;
-    exec.init();
-    while (exec.next() != null) {
-      counter ++;
-    }
-    exec.close();
-    assertEquals(tupleCount , counter);
-  }
-
-  private class TmpPlanner extends PhysicalPlannerImpl {
-    public TmpPlanner(TajoConf conf) {
-      super(conf);
-    }
-
-    @Override
-    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
-        throws IOException {
-      Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
-          "Error: There is no table matched to %s", scanNode.getTableName());
-
-      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName()));
-      
-      Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
-
-      return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
-
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 64da88b..a64b525 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -63,6 +63,7 @@ public class TestHashAntiJoinExec {
   private LogicalOptimizer optimizer;
   private StorageManager sm;
   private Path testDir;
+  private QueryContext queryContext;
 
   private TableDesc employee;
   private TableDesc people;
@@ -128,11 +129,12 @@ public class TestHashAntiJoinExec {
     appender.flush();
     appender.close();
 
+    queryContext = new QueryContext(conf);
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer(conf);
+    optimizer = new LogicalOptimizer(conf, catalog);
   }
 
   @After
@@ -159,7 +161,7 @@ public class TestHashAntiJoinExec {
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
         LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 4e218c5..196f3bf 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -64,6 +64,7 @@ public class TestHashSemiJoinExec {
   private LogicalOptimizer optimizer;
   private StorageManager sm;
   private Path testDir;
+  private QueryContext queryContext;
 
   private TableDesc employee;
   private TableDesc people;
@@ -133,11 +134,12 @@ public class TestHashSemiJoinExec {
     appender.flush();
     appender.close();
 
+    queryContext = new QueryContext(conf);
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer(conf);
+    optimizer = new LogicalOptimizer(conf, catalog);
   }
 
   @After
@@ -164,7 +166,7 @@ public class TestHashSemiJoinExec {
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
         LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index a4e49f7..9de58a3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -171,12 +171,13 @@ public class TestPhysicalPlanner {
     }
     appender.flush();
     appender.close();
+
+    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
     catalog.createTable(score);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer(conf);
+    optimizer = new LogicalOptimizer(conf, catalog);
 
-    defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
     masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
 
     createLargeScoreTable();

http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 2e093c1..84abfff 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -61,6 +61,7 @@ public class TestSortExec {
   private static Path workDir;
   private static Path tablePath;
   private static TableMeta employeeMeta;
+  private static QueryContext queryContext;
 
   private static Random rnd = new Random(System.currentTimeMillis());
 
@@ -101,9 +102,10 @@ public class TestSortExec {
         tablePath.toUri());
     catalog.createTable(desc);
 
+    queryContext = new QueryContext(conf);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer(conf);
+    optimizer = new LogicalOptimizer(conf, catalog);
   }
 
   public static String[] QUERIES = {
@@ -113,7 +115,7 @@ public class TestSortExec {
   public final void testNext() throws IOException, PlanningException {
     FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
         LocalTajoTestingUtility
         .newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
     ctx.setEnforcer(new Enforcer());