You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:14:50 UTC
[3/5] tajo git commit: TAJO-838: Improve query planner to utilize
index. (jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
index c6466f5..9187a69 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultSystemScanner.java
@@ -33,15 +33,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
@@ -297,43 +289,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
return tuples;
}
-
+
private List<Tuple> getIndexes(Schema outSchema) {
- List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+ List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes();
List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
List<Column> columns = outSchema.getColumns();
Tuple aTuple;
-
- for (IndexProto index: indexList) {
+
+ for (IndexDescProto index: indexList) {
aTuple = new VTuple(outSchema.size());
-
+
for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
Column column = columns.get(fieldId);
-
+
if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId()));
} else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid()));
} else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
- } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
- } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
- } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
- } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
- } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
- } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+ } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name()));
+ } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath()));
}
}
-
+
tuples.add(aTuple);
}
-
+
return tuples;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 249d335..f05f54b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -36,6 +36,7 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryContext;
@@ -57,6 +58,7 @@ import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.IPCUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
@@ -143,19 +145,17 @@ public class TajoMasterClientService extends AbstractService {
String sessionId =
context.getSessionManager().createSession(request.getUsername(), databaseName);
CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(IPCUtil.buildOkRequestResult());
builder.setSessionId(TajoIdProtos.SessionIdProto.newBuilder().setId(sessionId).build());
builder.setSessionVars(ProtoUtil.convertFromMap(context.getSessionManager().getAllVariables(sessionId)));
return builder.build();
} catch (NoSuchDatabaseException nsde) {
CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
- builder.setResultCode(ResultCode.ERROR);
- builder.setMessage(nsde.getMessage());
+ builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, nsde.getMessage(), null));
return builder.build();
} catch (InvalidSessionException e) {
CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
- builder.setResultCode(ResultCode.ERROR);
- builder.setMessage(e.getMessage());
+ builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null));
return builder.build();
}
}
@@ -173,15 +173,14 @@ public class TajoMasterClientService extends AbstractService {
public SessionUpdateResponse buildSessionUpdateOnSuccess(Map<String, String> variables) {
SessionUpdateResponse.Builder builder = SessionUpdateResponse.newBuilder();
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(IPCUtil.buildOkRequestResult());
builder.setSessionVars(new KeyValueSet(variables).getProto());
return builder.build();
}
public SessionUpdateResponse buildSessionUpdateOnError(String message) {
SessionUpdateResponse.Builder builder = SessionUpdateResponse.newBuilder();
- builder.setResultCode(ResultCode.ERROR);
- builder.setMessage(message);
+ builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, message, null));
return builder.build();
}
@@ -288,12 +287,8 @@ public class TajoMasterClientService extends AbstractService {
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
responseBuilder.setIsForwarded(true);
responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME));
- responseBuilder.setResultCode(ResultCode.ERROR);
- if (e.getMessage() != null) {
- responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e));
- } else {
- responseBuilder.setErrorMessage("Internal Error");
- }
+ responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ e.getMessage() == null ? "Internal Error" : ExceptionUtils.getStackTrace(e), null));
return responseBuilder.build();
}
}
@@ -304,23 +299,15 @@ public class TajoMasterClientService extends AbstractService {
try {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
QueryContext queryContext = new QueryContext(conf, session);
- if (queryContext.getCurrentDatabase() == null) {
- for (Map.Entry<String,String> e : queryContext.getAllKeyValus().entrySet()) {
- System.out.println(e.getKey() + "=" + e.getValue());
- }
- }
- UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
+ UpdateQueryResponse.Builder responseBuilder = UpdateQueryResponse.newBuilder();
try {
context.getGlobalEngine().updateQuery(queryContext, request.getQuery(), request.getIsJson());
- builder.setResultCode(ResultCode.OK);
- return builder.build();
+ return responseBuilder.setResult(IPCUtil.buildOkRequestResult()).build();
} catch (Exception e) {
- builder.setResultCode(ResultCode.ERROR);
- if (e.getMessage() == null) {
- builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
- }
- return builder.build();
+ responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ e.getMessage() == null ? ExceptionUtils.getStackTrace(e) : null, null));
+ return responseBuilder.build();
}
} catch (Throwable t) {
throw new ServiceException(t);
@@ -460,7 +447,7 @@ public class TajoMasterClientService extends AbstractService {
builder.setQueryId(request.getQueryId());
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(IPCUtil.buildOkRequestResult());
builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
@@ -474,7 +461,7 @@ public class TajoMasterClientService extends AbstractService {
}
if (queryInfo != null) {
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(IPCUtil.buildOkRequestResult());
builder.setState(queryInfo.getQueryState());
boolean isCreateTable = queryInfo.getQueryContext().isCreateTable();
@@ -495,11 +482,11 @@ public class TajoMasterClientService extends AbstractService {
} else {
Session session = context.getSessionManager().getSession(request.getSessionId().getId());
if (session.getNonForwardQueryResultScanner(queryId) != null) {
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(IPCUtil.buildOkRequestResult());
builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
- builder.setResultCode(ResultCode.ERROR);
- builder.setErrorMessage("No such query: " + queryId.toString());
+ builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ "No such query: " + queryId.toString(), null));
}
}
}
@@ -531,17 +518,16 @@ public class TajoMasterClientService extends AbstractService {
resultSetBuilder.addAllSerializedTuples(rows);
builder.setResultSet(resultSetBuilder.build());
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(IPCUtil.buildOkRequestResult());
LOG.info("Send result to client for " +
request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows");
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
- builder.setResultCode(ResultCode.ERROR);
String errorMessage = t.getMessage() == null ? t.getClass().getName() : t.getMessage();
- builder.setErrorMessage(errorMessage);
- builder.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(t));
+ builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ errorMessage, org.apache.hadoop.util.StringUtils.stringifyException(t)));
}
return builder.build();
}
@@ -581,11 +567,11 @@ public class TajoMasterClientService extends AbstractService {
if (queryInfo != null) {
builder.setQueryInfo(queryInfo.getProto());
}
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(IPCUtil.buildOkRequestResult());
} catch (Throwable t) {
LOG.warn(t.getMessage(), t);
- builder.setResultCode(ResultCode.ERROR);
- builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t));
+ builder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ org.apache.hadoop.util.StringUtils.stringifyException(t), null));
}
return builder.build();
@@ -779,13 +765,13 @@ public class TajoMasterClientService extends AbstractService {
if (catalog.existsTable(databaseName, tableName)) {
return TableResponse.newBuilder()
- .setResultCode(ResultCode.OK)
+ .setResult(IPCUtil.buildOkRequestResult())
.setTableDesc(catalog.getTableDesc(databaseName, tableName).getProto())
.build();
} else {
return TableResponse.newBuilder()
- .setResultCode(ResultCode.ERROR)
- .setErrorMessage("ERROR: no such a table: " + request.getTableName())
+ .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ "ERROR: no such a table: " + request.getTableName(), null))
.build();
}
} catch (Throwable t) {
@@ -821,21 +807,21 @@ public class TajoMasterClientService extends AbstractService {
meta, path, true, partitionDesc, false);
} catch (Exception e) {
return TableResponse.newBuilder()
- .setResultCode(ResultCode.ERROR)
- .setErrorMessage(e.getMessage()).build();
+ .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, e.getMessage(), null))
+ .build();
}
return TableResponse.newBuilder()
- .setResultCode(ResultCode.OK)
+ .setResult(IPCUtil.buildOkRequestResult())
.setTableDesc(desc.getProto()).build();
} catch (InvalidSessionException ise) {
return TableResponse.newBuilder()
- .setResultCode(ResultCode.ERROR)
- .setErrorMessage(ise.getMessage()).build();
+ .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, ise.getMessage(), null))
+ .build();
} catch (IOException ioe) {
return TableResponse.newBuilder()
- .setResultCode(ResultCode.ERROR)
- .setErrorMessage(ioe.getMessage()).build();
+ .setResult(IPCUtil.buildRequestResult(ResultCode.ERROR, ioe.getMessage(), null))
+ .build();
}
}
@@ -875,12 +861,184 @@ public class TajoMasterClientService extends AbstractService {
}
}
return FunctionResponse.newBuilder()
- .setResultCode(ResultCode.OK)
+ .setResult(IPCUtil.buildOkRequestResult())
.addAllFunctions(functionProtos)
.build();
} catch (Throwable t) {
throw new ServiceException(t);
}
}
+
+ @Override
+ public IndexDescProto getIndexWithName(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String indexName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ indexName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ indexName = request.getValue();
+ }
+ return catalog.getIndexByName(databaseName, indexName).getProto();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto existIndexWithName(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String indexName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ indexName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ indexName = request.getValue();
+ }
+ return catalog.existIndexByName(databaseName, indexName) ?
+ ProtoUtil.TRUE : ProtoUtil.FALSE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetIndexesResponse getIndexesForTable(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getValue();
+ }
+
+ GetIndexesResponse.Builder builder = GetIndexesResponse.newBuilder();
+ for (IndexDesc index : catalog.getAllIndexesByTable(databaseName, tableName)) {
+ builder.addIndexes(index.getProto());
+ }
+ builder.setResult(IPCUtil.buildOkRequestResult());
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto existIndexesForTable(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getValue();
+ }
+ return catalog.existIndexesByTable(databaseName, tableName) ?
+ ProtoUtil.TRUE : ProtoUtil.FALSE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public GetIndexWithColumnsResponse getIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getTableName())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getTableName();
+ }
+ String[] columnNames = new String[request.getColumnNamesCount()];
+ columnNames = request.getColumnNamesList().toArray(columnNames);
+
+ GetIndexWithColumnsResponse.Builder builder = GetIndexWithColumnsResponse.newBuilder();
+ builder.setResult(IPCUtil.buildOkRequestResult());
+ builder.setIndexDesc(catalog.getIndexByColumnNames(databaseName, tableName, columnNames).getProto());
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto existIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getTableName())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getTableName();
+ }
+ String[] columnNames = new String[request.getColumnNamesCount()];
+ columnNames = request.getColumnNamesList().toArray(columnNames);
+ return catalog.existIndexByColumnNames(databaseName, tableName, columnNames) ?
+ ProtoUtil.TRUE : ProtoUtil.FALSE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto dropIndex(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String indexName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ indexName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ indexName = request.getValue();
+ }
+ return catalog.dropIndex(databaseName, indexName) ?
+ ProtoUtil.TRUE : ProtoUtil.FALSE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index acbaa01..5b12438 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -68,45 +68,88 @@ public class DDLExecutor {
switch (root.getType()) {
- case ALTER_TABLESPACE:
- AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
- alterTablespace(context, queryContext, alterTablespace);
- return true;
-
-
- case CREATE_DATABASE:
- CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
- createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
- return true;
- case DROP_DATABASE:
- DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
- dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
- return true;
-
-
- case CREATE_TABLE:
- CreateTableNode createTable = (CreateTableNode) root;
- createTable(queryContext, createTable, createTable.isIfNotExists());
- return true;
- case DROP_TABLE:
- DropTableNode dropTable = (DropTableNode) root;
- dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
- return true;
- case TRUNCATE_TABLE:
- TruncateTableNode truncateTable = (TruncateTableNode) root;
- truncateTable(queryContext, truncateTable);
- return true;
-
- case ALTER_TABLE:
- AlterTableNode alterTable = (AlterTableNode) root;
- alterTable(context, queryContext, alterTable);
- return true;
+ case ALTER_TABLESPACE:
+ AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+ alterTablespace(context, queryContext, alterTablespace);
+ return true;
+
+
+ case CREATE_DATABASE:
+ CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+ createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+ return true;
+ case DROP_DATABASE:
+ DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+ dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+ return true;
+
+
+ case CREATE_TABLE:
+ CreateTableNode createTable = (CreateTableNode) root;
+ createTable(queryContext, createTable, createTable.isIfNotExists());
+ return true;
+ case DROP_TABLE:
+ DropTableNode dropTable = (DropTableNode) root;
+ dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+ return true;
+ case TRUNCATE_TABLE:
+ TruncateTableNode truncateTable = (TruncateTableNode) root;
+ truncateTable(queryContext, truncateTable);
+ return true;
+
+ case ALTER_TABLE:
+ AlterTableNode alterTable = (AlterTableNode) root;
+ alterTable(context, queryContext, alterTable);
+ return true;
+
+ case CREATE_INDEX:
+ // The catalog information for the created index is automatically updated when the query is successfully finished.
+ // See the Query.CreateIndexHook class.
+ return true;
+
+ case DROP_INDEX:
+ DropIndexNode dropIndexNode = (DropIndexNode) root;
+ dropIndex(queryContext, dropIndexNode);
+ return true;
default:
throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
}
}
+ public void dropIndex(final QueryContext queryContext, final DropIndexNode dropIndexNode) {
+ String databaseName, simpleIndexName;
+ if (CatalogUtil.isFQTableName(dropIndexNode.getIndexName())) {
+ String [] splits = CatalogUtil.splitFQTableName(dropIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = dropIndexNode.getIndexName();
+ }
+
+ if (!catalog.existIndexByName(databaseName, simpleIndexName)) {
+ throw new NoSuchIndexException(simpleIndexName);
+ }
+
+ IndexDesc desc = catalog.getIndexByName(databaseName, simpleIndexName);
+
+ if (!catalog.dropIndex(databaseName, simpleIndexName)) {
+ LOG.info("Cannot drop index \"" + simpleIndexName + "\".");
+ throw new CatalogException("Cannot drop index \"" + simpleIndexName + "\".");
+ }
+
+ Path indexPath = new Path(desc.getIndexPath());
+ try {
+ FileSystem fs = indexPath.getFileSystem(context.getConf());
+ fs.delete(indexPath, true);
+ } catch (IOException e) {
+ throw new InternalError(e.getMessage());
+ }
+
+ LOG.info("Index " + simpleIndexName + " is dropped.");
+ }
+
/**
* Alter a given table
*/
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2242445..26476a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -29,8 +29,10 @@ import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.exception.AlreadyExistsIndexException;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
@@ -40,6 +42,7 @@ import org.apache.tajo.engine.planner.physical.EvalExprExec;
import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
import org.apache.tajo.master.NonForwardQueryResultFileScanner;
import org.apache.tajo.master.NonForwardQueryResultScanner;
@@ -58,6 +61,7 @@ import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
import org.apache.tajo.storage.*;
+import org.apache.tajo.util.IPCUtil;
import org.apache.tajo.util.ProtoUtil;
import org.apache.tajo.worker.TaskAttemptContext;
@@ -98,10 +102,18 @@ public class QueryExecutor {
} else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
- ddlExecutor.execute(queryContext, plan);
- response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- response.setResultCode(ClientProtos.ResultCode.OK);
+ if (PlannerUtil.isDistExecDDL(rootNode)) {
+ if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) {
+ checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild());
+ }
+ executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
+ } else {
+ response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ response.setResult(IPCUtil.buildOkRequestResult());
+ }
+
+ ddlExecutor.execute(queryContext, plan);
} else if (plan.isExplain()) { // explain query
execExplain(plan, response);
@@ -142,8 +154,8 @@ public class QueryExecutor {
session.selectDatabase(setSessionNode.getValue());
} else {
response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- response.setResultCode(ClientProtos.ResultCode.ERROR);
- response.setErrorMessage("database \"" + databaseName + "\" does not exists.");
+ response.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ "database \"" + databaseName + "\" does not exists.", null));
}
// others
@@ -157,7 +169,7 @@ public class QueryExecutor {
context.getSystemMetrics().counter("Query", "numDDLQuery").inc();
response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- response.setResultCode(ClientProtos.ResultCode.OK);
+ response.setResult(IPCUtil.buildOkRequestResult());
}
public void execExplain(LogicalPlan plan, SubmitQueryResponse.Builder response) throws IOException {
@@ -183,7 +195,7 @@ public class QueryExecutor {
response.setResultSet(serializedResBuilder.build());
response.setMaxRowNum(lines.length);
- response.setResultCode(ClientProtos.ResultCode.OK);
+ response.setResult(IPCUtil.buildOkRequestResult());
response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
}
@@ -205,7 +217,7 @@ public class QueryExecutor {
response.setQueryId(queryId.getProto());
response.setMaxRowNum(maxRow);
response.setTableDesc(queryResultScanner.getTableDesc().getProto());
- response.setResultCode(ClientProtos.ResultCode.OK);
+ response.setResult(IPCUtil.buildOkRequestResult());
}
public void execSimpleQuery(QueryContext queryContext, Session session, String query, LogicalPlan plan,
@@ -235,7 +247,7 @@ public class QueryExecutor {
response.setQueryId(queryId.getProto());
response.setMaxRowNum(maxRow);
response.setTableDesc(desc.getProto());
- response.setResultCode(ClientProtos.ResultCode.OK);
+ response.setResult(IPCUtil.buildOkRequestResult());
}
public void execNonFromQuery(QueryContext queryContext, Session session, String query,
@@ -267,7 +279,7 @@ public class QueryExecutor {
responseBuilder.setResultSet(serializedResBuilder);
responseBuilder.setMaxRowNum(1);
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ responseBuilder.setResult(IPCUtil.buildOkRequestResult());
}
}
@@ -369,7 +381,7 @@ public class QueryExecutor {
// If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
responseBuilder.setMaxRowNum(-1);
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ responseBuilder.setResult(IPCUtil.buildOkRequestResult());
}
public void executeDistributedQuery(QueryContext queryContext, Session session,
@@ -398,13 +410,13 @@ public class QueryExecutor {
if(queryInfo == null) {
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
- responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+ responseBuilder.setResult(IPCUtil.buildRequestResult(ResultCode.ERROR,
+ "Fail starting QueryMaster.", null));
LOG.error("Fail starting QueryMaster: " + sql);
} else {
responseBuilder.setIsForwarded(true);
responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
- responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+ responseBuilder.setResult(IPCUtil.buildOkRequestResult());
if(queryInfo.getQueryMasterHost() != null) {
responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
}
@@ -413,4 +425,23 @@ public class QueryExecutor {
" is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
}
}
+
+ private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
+ throws IOException {
+ String databaseName, simpleIndexName, qualifiedIndexName;
+ if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+ String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ qualifiedIndexName = createIndexNode.getIndexName();
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = createIndexNode.getIndexName();
+ qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+ }
+
+ if (catalog.existIndexByName(databaseName, simpleIndexName)) {
+ throw new AlreadyExistsIndexException(qualifiedIndexName);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index a626df1..88ecebc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -32,6 +32,8 @@ import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.CatalogException;
import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
@@ -472,6 +474,7 @@ public class Query implements EventHandler<QueryEvent> {
hookList.add(new MaterializedResultHook());
hookList.add(new CreateTableHook());
hookList.add(new InsertTableHook());
+ hookList.add(new CreateIndexHook());
}
public void execute(QueryContext queryContext, Query query,
@@ -485,6 +488,48 @@ public class Query implements EventHandler<QueryEvent> {
}
}
+ private class CreateIndexHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ Stage lastStage = query.getStage(finalExecBlockId);
+
+ CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan();
+ String databaseName, simpleIndexName, qualifiedIndexName;
+ if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+ String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ qualifiedIndexName = createIndexNode.getIndexName();
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = createIndexNode.getIndexName();
+ qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+ }
+ ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
+ if (scanNode == null) {
+ throw new IOException("Cannot find the table of the relation");
+ }
+ IndexDesc indexDesc = new IndexDesc(databaseName, scanNode.getTableName(),
+ simpleIndexName, createIndexNode.getIndexPath(),
+ createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
+ createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
+ if (catalog.createIndex(indexDesc)) {
+ LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
+ } else {
+ LOG.info("Index creation " + qualifiedIndexName + " is failed.");
+ throw new CatalogException("Cannot create index \"" + qualifiedIndexName + "\".");
+ }
+ }
+ }
+
private class MaterializedResultHook implements QueryHook {
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 9c789a5..a125415 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -38,6 +38,12 @@ import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.UnimplementedException;
@@ -48,15 +54,9 @@ import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.verifier.VerifyException;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
@@ -347,9 +347,10 @@ public class QueryMasterTask extends CompositeService {
LOG.warn("Query already started");
return;
}
+ LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
jsonExpr = null; // remove the possible OOM
plan = planner.createPlan(queryContext, expr);
@@ -393,6 +394,14 @@ public class QueryMasterTask extends CompositeService {
tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
}
}
+
+ scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN);
+ if (scanNodes != null) {
+ for (LogicalNode eachScanNode : scanNodes) {
+ ScanNode scanNode = (ScanNode) eachScanNode;
+ tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+ }
+ }
}
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
queryMasterContext.getGlobalPlanner().build(masterPlan);
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java
new file mode 100644
index 0000000..dcffe62
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/util/IPCUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.ipc.ClientProtos.RequestResult;
+import org.apache.tajo.ipc.ClientProtos.ResultCode;
+
+public class IPCUtil {
+
+ public static RequestResult buildOkRequestResult() {
+ return buildRequestResult(ResultCode.OK, null, null);
+ }
+
+ public static RequestResult buildRequestResult(ResultCode code,
+ @Nullable String errorMessage,
+ @Nullable String errorTrace) {
+ RequestResult.Builder builder = RequestResult.newBuilder();
+ builder.setResultCode(code);
+ if (errorMessage != null) {
+ builder.setErrorMessage(errorMessage);
+ }
+ if (errorTrace != null) {
+ builder.setErrorTrace(errorTrace);
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
deleted file mode 100644
index 3147bb6..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util;
-
-import com.google.gson.Gson;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.logical.IndexScanNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-public class IndexUtil {
- public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
- StringBuilder builder = new StringBuilder();
- builder.append(fragment.getPath().getName() + "_");
- builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_");
- for(int i = 0 ; i < keys.length ; i ++) {
- builder.append(keys[i].getSortKey().getSimpleName()+"_");
- }
- builder.append("_index");
- return builder.toString();
-
- }
-
- public static String getIndexName(String indexName , SortSpec[] keys) {
- StringBuilder builder = new StringBuilder();
- builder.append(indexName + "_");
- for(int i = 0 ; i < keys.length ; i ++) {
- builder.append(keys[i].getSortKey().getSimpleName() + "_");
- }
- return builder.toString();
- }
-
- public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
- Iterator<Entry<String, String>> iter ) {
-
- EvalNode qual = scanNode.getQual();
- Gson gson = CoreGsonHelper.getInstance();
-
- FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
- qual.preOrder(nodeFinder);
- LinkedList<BinaryEval> nodeList = nodeFinder.getNodeList();
-
- int maxSize = Integer.MIN_VALUE;
- SortSpec[] maxIndex = null;
-
- String json;
- while(iter.hasNext()) {
- Entry<String , String> entry = iter.next();
- json = entry.getValue();
- SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
- if(sortKey.length > nodeList.size()) {
- /* If the number of the sort key is greater than where condition,
- * this index cannot be used
- * */
- continue;
- } else {
- boolean[] equal = new boolean[sortKey.length];
- for(int i = 0 ; i < sortKey.length ; i ++) {
- for(int j = 0 ; j < nodeList.size() ; j ++) {
- Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
- if(col.equals(sortKey[i].getSortKey())) {
- equal[i] = true;
- }
- }
- }
- boolean chk = true;
- for(int i = 0 ; i < equal.length ; i ++) {
- chk = chk && equal[i];
- }
- if(chk) {
- if(maxSize < sortKey.length) {
- maxSize = sortKey.length;
- maxIndex = sortKey;
- }
- }
- }
- }
- if(maxIndex == null) {
- return null;
- } else {
- Schema keySchema = new Schema();
- for(int i = 0 ; i < maxIndex.length ; i ++ ) {
- keySchema.addColumn(maxIndex[i].getSortKey());
- }
- Datum[] datum = new Datum[nodeList.size()];
- for(int i = 0 ; i < nodeList.size() ; i ++ ) {
- datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
- }
-
- return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
- }
-
- }
-
-
- private static class FieldAndValueFinder implements EvalNodeVisitor {
- private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>();
-
- public LinkedList<BinaryEval> getNodeList () {
- return this.nodeList;
- }
-
- @Override
- public void visit(EvalNode node) {
- BinaryEval binaryEval = (BinaryEval) node;
- switch(node.getType()) {
- case AND:
- break;
- case EQUAL:
- if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
- && binaryEval.getRightExpr().getType() == EvalType.CONST ) {
- nodeList.add(binaryEval);
- }
- break;
- case IS_NULL:
- if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
- && binaryEval.getRightExpr().getType() == EvalType.CONST) {
- nodeList.add(binaryEval);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index da25fe6..cd9e6ef 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -304,7 +304,7 @@ public class QueryExecutorServlet extends HttpServlet {
LOG.error("Internal Error: SubmissionResponse is NULL");
error = new Exception("Internal Error: SubmissionResponse is NULL");
- } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+ } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
if (response.getIsForwarded()) {
queryId = new QueryId(response.getQueryId());
getQueryResult(queryId);
@@ -316,9 +316,9 @@ public class QueryExecutorServlet extends HttpServlet {
progress.set(100);
}
- } else if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
- if (response.hasErrorMessage()) {
- StringBuffer errorMessage = new StringBuffer(response.getErrorMessage());
+ } else if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
+ if (response.getResult().hasErrorMessage()) {
+ StringBuffer errorMessage = new StringBuffer(response.getResult().getErrorMessage());
String modifiedMessage;
if (errorMessage.length() > 200) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 1c83110..6a13898 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -25,10 +25,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
import org.apache.tajo.QueryId;
+import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
+import org.apache.tajo.ipc.ClientProtos.RequestResult;
import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -129,14 +132,32 @@ public class TajoWorkerClientService extends AbstractService {
if (queryHistory != null) {
builder.setQueryHistory(queryHistory.getProto());
}
- builder.setResultCode(ResultCode.OK);
+ builder.setResult(buildOkRequestResult());
} catch (Throwable t) {
LOG.warn(t.getMessage(), t);
- builder.setResultCode(ResultCode.ERROR);
- builder.setErrorMessage(org.apache.hadoop.util.StringUtils.stringifyException(t));
+ builder.setResult(buildRequestResult(ResultCode.ERROR,
+ StringUtils.stringifyException(t), null));
}
return builder.build();
}
+
+ private RequestResult buildOkRequestResult() {
+ return buildRequestResult(ResultCode.OK, null, null);
+ }
+
+ private RequestResult buildRequestResult(ResultCode code,
+ @Nullable String errorMessage,
+ @Nullable String errorTrace) {
+ RequestResult.Builder builder = RequestResult.newBuilder();
+ builder.setResultCode(code);
+ if (errorMessage != null) {
+ builder.setErrorMessage(errorMessage);
+ }
+ if (errorTrace != null) {
+ builder.setErrorTrace(errorTrace);
+ }
+ return builder.build();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 5f9c6ac..b784c64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -125,21 +125,9 @@ public class Task {
this.inputStats = new TableStats();
plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan());
- LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
- if (scanNode != null) {
- for (LogicalNode node : scanNode) {
- ScanNode scan = (ScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
-
- LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
- if (partitionScanNode != null) {
- for (LogicalNode node : partitionScanNode) {
- PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
+ updateDescsForScanNodes(NodeType.SCAN);
+ updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
+ updateDescsForScanNodes(NodeType.INDEX_SCAN);
interQuery = request.getProto().getInterQuery();
if (interQuery) {
@@ -181,6 +169,17 @@ public class Task {
LOG.info("==================================");
}
+ private void updateDescsForScanNodes(NodeType nodeType) {
+ assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
+ LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
+ if (scanNodes != null) {
+ for (LogicalNode node : scanNodes) {
+ ScanNode scanNode = (ScanNode) node;
+ descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+ }
+ }
+ }
+
public void init() throws IOException {
if (context.getState() == TaskAttemptState.TA_PENDING) {
// initialize a task temporal dir
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index dfc8a9b..6c0af89 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -379,12 +379,15 @@ public class TaskAttemptContext {
return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
}
- public long getUniqueKeyFromFragments() {
- List<FragmentProto> totalFragments = new ArrayList<FragmentProto>();
- for (List<FragmentProto> eachFragments : fragmentMap.values()) {
- totalFragments.addAll(eachFragments);
+ public String getUniqueKeyFromFragments() {
+ StringBuilder sb = new StringBuilder();
+ for (List<FragmentProto> fragments : fragmentMap.values()) {
+ for (FragmentProto f : fragments) {
+ FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f);
+ sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
+ }
}
- return Objects.hashCode(totalFragments.toArray(new FragmentProto[totalFragments.size()]));
+ return sb.toString();
}
public int hashCode() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 4e4b710..615dbb9 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -36,11 +36,11 @@ import org.apache.tajo.engine.codegen.TajoClassLoader;
import org.apache.tajo.engine.function.FunctionLoader;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.*;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.serder.EvalNodeDeserializer;
import org.apache.tajo.plan.serder.EvalNodeSerializer;
-import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
@@ -98,7 +98,7 @@ public class ExprTestBase {
analyzer = new SQLAnalyzer();
preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
planner = new LogicalPlanner(cat);
- optimizer = new LogicalOptimizer(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), cat);
annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index c9b52fd..3122b25 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -104,9 +104,9 @@ public class TestLogicalOptimizer {
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer(util.getConfiguration());
defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 996d736..0bf4602 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -497,7 +497,7 @@ public class TestLogicalPlanner {
Schema expected = tpch.getOutSchema("q2");
assertSchema(expected, node.getOutSchema());
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.JOIN);
@@ -536,7 +536,7 @@ public class TestLogicalPlanner {
LogicalNode node = plan.getRootBlock().getRoot();
testJsonSerDerObject(node);
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -577,7 +577,7 @@ public class TestLogicalPlanner {
LogicalNode node = plan.getRootBlock().getRoot();
testJsonSerDerObject(node);
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -624,7 +624,7 @@ public class TestLogicalPlanner {
LogicalNode node = plan.getRootBlock().getRoot();
testJsonSerDerObject(node);
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
Map<BinaryEval, Boolean> scanMap = TUtil.newHashMap();
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
index 3803c7a..9f20776 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -180,7 +180,7 @@ public class TestBroadcastJoinPlan {
"join small2 on small1_id = small2_id";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -241,7 +241,7 @@ public class TestBroadcastJoinPlan {
"join small3 on small1_id = small3_id";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -305,7 +305,7 @@ public class TestBroadcastJoinPlan {
"join large2 on large1_id = large2_id ";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -333,7 +333,7 @@ public class TestBroadcastJoinPlan {
"join small2 on large2_id = small2_id";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -383,7 +383,7 @@ public class TestBroadcastJoinPlan {
"join small2 on a.small1_id = small2_id";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -424,7 +424,7 @@ public class TestBroadcastJoinPlan {
"join (select * from small1) a on large1_id = a.small1_id";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -480,7 +480,7 @@ public class TestBroadcastJoinPlan {
"left outer join large2 on small1_id = large2_id ";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -534,7 +534,7 @@ public class TestBroadcastJoinPlan {
"left outer join small3 on large1_id = small3_id ";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -617,7 +617,7 @@ public class TestBroadcastJoinPlan {
"left outer join small3 on large3_id = small3_id ";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -700,7 +700,7 @@ public class TestBroadcastJoinPlan {
"left outer join small3 on small1_id = small3_id ";
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -759,7 +759,7 @@ public class TestBroadcastJoinPlan {
"left outer join small3 on small1_id = small3_id " ;
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -812,7 +812,7 @@ public class TestBroadcastJoinPlan {
"left outer join small3 on small1_id = small3_id " ;
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -904,7 +904,7 @@ public class TestBroadcastJoinPlan {
"left outer join small3 on small3_id = large1_id " ;
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -969,7 +969,7 @@ public class TestBroadcastJoinPlan {
"left outer join small2 on large1_id = small2_id " ;
LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
Expr expr = analyzer.parse(query);
LogicalPlan plan = planner.createPlan(defaultContext, expr);
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
deleted file mode 100644
index c897461..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.Stack;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-
-public class TestBSTIndexExec {
-
- private TajoConf conf;
- private Path idxPath;
- private CatalogService catalog;
- private SQLAnalyzer analyzer;
- private LogicalPlanner planner;
- private LogicalOptimizer optimizer;
- private FileStorageManager sm;
- private Schema idxSchema;
- private BaseTupleComparator comp;
- private BSTIndex.BSTIndexWriter writer;
- private HashMap<Integer , Integer> randomValues ;
- private int rndKey = -1;
- private FileSystem fs;
- private TableMeta meta;
- private Path tablePath;
-
- private Random rnd = new Random(System.currentTimeMillis());
-
- private TajoTestingCluster util;
-
- @Before
- public void setup() throws Exception {
- this.randomValues = new HashMap<Integer, Integer>();
- this.conf = new TajoConf();
- util = new TajoTestingCluster();
- util.startCatalogCluster();
- catalog = util.getMiniCatalogCluster().getCatalog();
-
- Path workDir = CommonTestingUtil.getTestDir();
- catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
- catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
- sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir);
-
- idxPath = new Path(workDir, "test.idx");
-
- Schema schema = new Schema();
- schema.addColumn("managerid", Type.INT4);
- schema.addColumn("empid", Type.INT4);
- schema.addColumn("deptname", Type.TEXT);
-
- this.idxSchema = new Schema();
- idxSchema.addColumn("managerid", Type.INT4);
- SortSpec[] sortKeys = new SortSpec[1];
- sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
- this.comp = new BaseTupleComparator(idxSchema, sortKeys);
-
- this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
- BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);
- writer.setLoadNum(100);
- writer.open();
- long offset;
-
- meta = CatalogUtil.newTableMeta(StoreType.CSV);
- tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv");
- fs = tablePath.getFileSystem(conf);
- fs.mkdirs(tablePath.getParent());
-
- FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath);
- appender.init();
- Tuple tuple = new VTuple(schema.size());
- for (int i = 0; i < 10000; i++) {
-
- Tuple key = new VTuple(this.idxSchema.size());
- int rndKey = rnd.nextInt(250);
- if(this.randomValues.containsKey(rndKey)) {
- int t = this.randomValues.remove(rndKey) + 1;
- this.randomValues.put(rndKey, t);
- } else {
- this.randomValues.put(rndKey, 1);
- }
-
- key.put(new Datum[] { DatumFactory.createInt4(rndKey) });
- tuple.put(new Datum[] { DatumFactory.createInt4(rndKey),
- DatumFactory.createInt4(rnd.nextInt(10)),
- DatumFactory.createText("dept_" + rnd.nextInt(10)) });
- offset = appender.getOffset();
- appender.addTuple(tuple);
- writer.write(key, offset);
- }
- appender.flush();
- appender.close();
- writer.close();
-
- TableDesc desc = new TableDesc(
- CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta,
- sm.getTablePath("employee").toUri());
- catalog.createTable(desc);
-
- analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer(conf);
- }
-
- @After
- public void tearDown() {
- util.shutdownCatalogCluster();
- }
-
- @Test
- public void testEqual() throws Exception {
- this.rndKey = rnd.nextInt(250);
- final String QUERY = "select * from employee where managerId = " + rndKey;
-
- FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
- Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
- LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
- Expr expr = analyzer.parse(QUERY);
- LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
- LogicalNode rootNode = optimizer.optimize(plan);
-
- TmpPlanner phyPlanner = new TmpPlanner(conf);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
- int tupleCount = this.randomValues.get(rndKey);
- int counter = 0;
- exec.init();
- while (exec.next() != null) {
- counter ++;
- }
- exec.close();
- assertEquals(tupleCount , counter);
- }
-
- private class TmpPlanner extends PhysicalPlannerImpl {
- public TmpPlanner(TajoConf conf) {
- super(conf);
- }
-
- @Override
- public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
- throws IOException {
- Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
- "Error: There is no table matched to %s", scanNode.getTableName());
-
- List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName()));
-
- Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
-
- return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 64da88b..a64b525 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -63,6 +63,7 @@ public class TestHashAntiJoinExec {
private LogicalOptimizer optimizer;
private StorageManager sm;
private Path testDir;
+ private QueryContext queryContext;
private TableDesc employee;
private TableDesc people;
@@ -128,11 +129,12 @@ public class TestHashAntiJoinExec {
appender.flush();
appender.close();
+ queryContext = new QueryContext(conf);
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
}
@After
@@ -159,7 +161,7 @@ public class TestHashAntiJoinExec {
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashAntiJoin");
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 4e218c5..196f3bf 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -64,6 +64,7 @@ public class TestHashSemiJoinExec {
private LogicalOptimizer optimizer;
private StorageManager sm;
private Path testDir;
+ private QueryContext queryContext;
private TableDesc employee;
private TableDesc people;
@@ -133,11 +134,12 @@ public class TestHashSemiJoinExec {
appender.flush();
appender.close();
+ queryContext = new QueryContext(conf);
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
}
@After
@@ -164,7 +166,7 @@ public class TestHashSemiJoinExec {
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testHashSemiJoin");
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index a4e49f7..9de58a3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -171,12 +171,13 @@ public class TestPhysicalPlanner {
}
appender.flush();
appender.close();
+
+ defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
catalog.createTable(score);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
- defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
createLargeScoreTable();
http://git-wip-us.apache.org/repos/asf/tajo/blob/071c5d05/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 2e093c1..84abfff 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -61,6 +61,7 @@ public class TestSortExec {
private static Path workDir;
private static Path tablePath;
private static TableMeta employeeMeta;
+ private static QueryContext queryContext;
private static Random rnd = new Random(System.currentTimeMillis());
@@ -101,9 +102,10 @@ public class TestSortExec {
tablePath.toUri());
catalog.createTable(desc);
+ queryContext = new QueryContext(conf);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
}
public static String[] QUERIES = {
@@ -113,7 +115,7 @@ public class TestSortExec {
public final void testNext() throws IOException, PlanningException {
FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
LocalTajoTestingUtility
.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
ctx.setEnforcer(new Enforcer());