You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:19 UTC
[11/13] tajo git commit: TAJO-1288: Refactoring
org.apache.tajo.master package.
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
new file mode 100644
index 0000000..f645dc5
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.exec;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.codegen.CompilationError;
+import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.TajoMaster.MasterContext;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import com.google.protobuf.ByteString;
+
+public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
+
+ private final Log LOG = LogFactory.getLog(getClass());
+
+ private MasterContext masterContext;
+ private LogicalPlan logicalPlan;
+ private final QueryId queryId;
+ private final String sessionId;
+ private TaskAttemptContext taskContext;
+ private int currentRow;
+ private long maxRow;
+ private TableDesc tableDesc;
+ private Schema outSchema;
+ private RowStoreEncoder encoder;
+ private PhysicalExec physicalExec;
+
+ public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId,
+ String sessionId, int maxRow) {
+ masterContext = context;
+ logicalPlan = plan;
+ this.queryId = queryId;
+ this.sessionId = sessionId;
+ this.maxRow = maxRow;
+
+ }
+
+ @Override
+ public void init() throws IOException {
+ QueryContext queryContext = new QueryContext(masterContext.getConf());
+ currentRow = 0;
+
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
+ GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
+ try {
+ globalPlanner.build(masterPlan);
+ } catch (PlanningException e) {
+ throw new RuntimeException(e);
+ }
+
+ ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
+ ExecutionBlock leafBlock = null;
+ while (cursor.hasNext()) {
+ ExecutionBlock block = cursor.nextBlock();
+ if (masterPlan.isLeaf(block)) {
+ leafBlock = block;
+ break;
+ }
+ }
+
+ taskContext = new TaskAttemptContext(queryContext, null,
+ new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0),
+ null, null);
+ physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf())
+ .createPlan(taskContext, leafBlock.getPlan());
+
+ tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(),
+ new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null);
+ outSchema = physicalExec.getSchema();
+ encoder = RowStoreUtil.createEncoder(getLogicalSchema());
+
+ physicalExec.init();
+ }
+
+ @Override
+ public void close() throws Exception {
+ tableDesc = null;
+ outSchema = null;
+ encoder = null;
+ if (physicalExec != null) {
+ try {
+ physicalExec.close();
+ } catch (Exception ignored) {}
+ }
+ physicalExec = null;
+ currentRow = -1;
+ }
+
+ private List<Tuple> getTablespaces(Schema outSchema) {
+ List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces();
+ List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TablespaceProto tablespace: tablespaces) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+ if (tablespace.hasId()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName()));
+ } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) {
+ if (tablespace.hasHandler()) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri()));
+ }
+ }
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getDatabases(Schema outSchema) {
+ List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases();
+ List<Tuple> tuples = new ArrayList<Tuple>(databases.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (DatabaseProto database: databases) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(database.getId()));
+ } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(database.getName()));
+ } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+ if (database.hasSpaceId()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getTables(Schema outSchema) {
+ List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables();
+ List<Tuple> tuples = new ArrayList<Tuple>(tables.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableDescriptorProto table: tables) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(table.getTid()));
+ } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId()));
+ } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getName()));
+ } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) {
+ if (table.hasTableType()) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getTableType()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getPath()));
+ } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getStoreType()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getColumns(Schema outSchema) {
+ List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns();
+ List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+ int columnId = 1, prevtid = -1, tid = 0;
+
+ for (ColumnProto column: columnsList) {
+ aTuple = new VTuple(outSchema.size());
+
+ tid = column.getTid();
+ if (prevtid != tid) {
+ columnId = 1;
+ prevtid = tid;
+ }
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column colObj = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(colObj.getSimpleName())) {
+ if (column.hasTid()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(tid));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(column.getName()));
+ } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(columnId));
+ } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString()));
+ } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) {
+ DataType dataType = column.getDataType();
+ if (dataType.hasLength()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ }
+
+ columnId++;
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getIndexes(Schema outSchema) {
+ List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+ List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (IndexProto index: indexList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+ } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+ } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
+ } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
+ } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
+ } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
+ } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
+ } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
+ } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllTableOptions(Schema outSchema) {
+ List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions();
+ List<Tuple> tuples = new ArrayList<Tuple>(optionList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableOptionProto option: optionList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(option.getTid()));
+ } else if ("key_".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey()));
+ } else if ("value_".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllTableStats(Schema outSchema) {
+ List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats();
+ List<Tuple> tuples = new ArrayList<Tuple>(statList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableStatsProto stat: statList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid()));
+ } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows()));
+ } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllPartitions(Schema outSchema) {
+ List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions();
+ List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TablePartitionProto partition: partitionList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("pid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid()));
+ } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid()));
+ } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) {
+ if (partition.hasPartitionName()) {
+ aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition()));
+ } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(partition.getPath()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
+ List<Tuple> tuples = null;
+ String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
+
+ if ("tablespace".equalsIgnoreCase(tableName)) {
+ tuples = getTablespaces(inSchema);
+ } else if ("databases".equalsIgnoreCase(tableName)) {
+ tuples = getDatabases(inSchema);
+ } else if ("tables".equalsIgnoreCase(tableName)) {
+ tuples = getTables(inSchema);
+ } else if ("columns".equalsIgnoreCase(tableName)) {
+ tuples = getColumns(inSchema);
+ } else if ("indexes".equalsIgnoreCase(tableName)) {
+ tuples = getIndexes(inSchema);
+ } else if ("table_options".equalsIgnoreCase(tableName)) {
+ tuples = getAllTableOptions(inSchema);
+ } else if ("table_stats".equalsIgnoreCase(tableName)) {
+ tuples = getAllTableStats(inSchema);
+ } else if ("partitions".equalsIgnoreCase(tableName)) {
+ tuples = getAllPartitions(inSchema);
+ }
+
+ return tuples;
+ }
+
+ @Override
+ public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+ List<ByteString> rows = new ArrayList<ByteString>();
+ int startRow = currentRow;
+ int endRow = startRow + fetchRowNum;
+
+ if (physicalExec == null) {
+ return rows;
+ }
+
+ while (currentRow < endRow) {
+ Tuple currentTuple = physicalExec.next();
+
+ if (currentTuple == null) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+
+ currentRow++;
+ rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple)));
+
+ if (currentRow >= maxRow) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+ }
+
+ return rows;
+ }
+
+ @Override
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
+
+ @Override
+ public Schema getLogicalSchema() {
+ return outSchema;
+ }
+
+ class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
+
+ public SimplePhysicalPlannerImpl(TajoConf conf) {
+ super(conf);
+ }
+
+ @Override
+ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+ throws IOException {
+ return new SystemPhysicalExec(ctx, scanNode);
+ }
+
+ @Override
+ public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException {
+ return new SystemPhysicalExec(ctx, annotation);
+ }
+ }
+
+ class SystemPhysicalExec extends PhysicalExec {
+
+ private ScanNode scanNode;
+ private EvalNode qual;
+ private Projector projector;
+ private TableStats tableStats;
+ private final List<Tuple> cachedData;
+ private int currentRow;
+ private boolean isClosed;
+
+ public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) {
+ super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+ this.scanNode = scanNode;
+ this.qual = this.scanNode.getQual();
+ cachedData = TUtil.newList();
+ currentRow = 0;
+ isClosed = false;
+
+ projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple aTuple = null;
+ Tuple outTuple = new VTuple(outColumnNum);
+
+ if (isClosed) {
+ return null;
+ }
+
+ if (cachedData.size() == 0) {
+ rescan();
+ }
+
+ if (!scanNode.hasQual()) {
+ if (currentRow < cachedData.size()) {
+ aTuple = cachedData.get(currentRow++);
+ projector.eval(aTuple, outTuple);
+ outTuple.setOffset(aTuple.getOffset());
+ return outTuple;
+ }
+ return null;
+ } else {
+ while (currentRow < cachedData.size()) {
+ aTuple = cachedData.get(currentRow++);
+ if (qual.eval(inSchema, aTuple).isTrue()) {
+ projector.eval(aTuple, outTuple);
+ return outTuple;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ cachedData.clear();
+ cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema));
+
+ tableStats = new TableStats();
+ tableStats.setNumRows(cachedData.size());
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanNode = null;
+ qual = null;
+ projector = null;
+ cachedData.clear();
+ currentRow = -1;
+ isClosed = true;
+ }
+
+ @Override
+ public float getProgress() {
+ return 1.0f;
+ }
+
+ @Override
+ protected void compile() throws CompilationError {
+ if (scanNode.hasQual()) {
+ qual = context.getPrecompiledEval(inSchema, qual);
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return tableStats;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2242445..2fbebc1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -41,15 +41,12 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
-import org.apache.tajo.master.NonForwardQueryResultFileScanner;
-import org.apache.tajo.master.NonForwardQueryResultScanner;
-import org.apache.tajo.master.NonForwardQueryResultSystemScanner;
-import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.*;
import org.apache.tajo.master.exec.prehook.CreateTableHook;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.exec.prehook.InsertIntoHook;
-import org.apache.tajo.master.querymaster.*;
-import org.apache.tajo.master.session.Session;
+import org.apache.tajo.querymaster.*;
+import org.apache.tajo.session.Session;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.Target;
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
deleted file mode 100644
index 3d6669c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAService.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.ha;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * The HAService is responsible for setting active TajoMaster on startup or when the
- * current active is changing (eg due to failure), monitoring the health of TajoMaster.
- *
- */
-public interface HAService {
-
- /**
- * Add master name to shared storage.
- */
- public void register() throws IOException;
-
-
- /**
- * Delete master name to shared storage.
- *
- */
- public void delete() throws IOException;
-
- /**
- *
- * @return True if current master is an active master.
- */
- public boolean isActiveStatus();
-
- /**
- *
- * @return return all master list
- * @throws IOException
- */
- public List<TajoMasterInfo> getMasters() throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
deleted file mode 100644
index 45219b3..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.ha;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAConstants;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster.
- *
- */
-public class HAServiceHDFSImpl implements HAService {
- private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class);
-
- private MasterContext context;
- private TajoConf conf;
-
- private FileSystem fs;
-
- private String masterName;
- private Path rootPath;
- private Path haPath;
- private Path activePath;
- private Path backupPath;
-
- private boolean isActiveStatus = false;
-
- //thread which runs periodically to see the last time since a heartbeat is received.
- private Thread checkerThread;
- private volatile boolean stopped = false;
-
- private int monitorInterval;
-
- private String currentActiveMaster;
-
- public HAServiceHDFSImpl(MasterContext context) throws IOException {
- this.context = context;
- this.conf = context.getConf();
- initSystemDirectory();
-
- InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress();
- this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort();
-
- monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL);
- }
-
- private void initSystemDirectory() throws IOException {
- // Get Tajo root dir
- this.rootPath = TajoConf.getTajoRootDir(conf);
-
- // Check Tajo root dir
- this.fs = rootPath.getFileSystem(conf);
-
- // Check and create Tajo system HA dir
- haPath = TajoConf.getSystemHADir(conf);
- if (!fs.exists(haPath)) {
- fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA dir '" + haPath + "' is created");
- }
-
- activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME);
- if (!fs.exists(activePath)) {
- fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA Active dir '" + activePath + "' is created");
- }
-
- backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME);
- if (!fs.exists(backupPath)) {
- fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION));
- LOG.info("System HA Backup dir '" + backupPath + "' is created");
- }
- }
-
- private void startPingChecker() {
- if (checkerThread == null) {
- checkerThread = new Thread(new PingChecker());
- checkerThread.setName("Ping Checker");
- checkerThread.start();
- }
- }
-
- @Override
- public void register() throws IOException {
- FileStatus[] files = fs.listStatus(activePath);
-
- // Phase 1: If there is not another active master, this try to become active master.
- if (files.length == 0) {
- createMasterFile(true);
- currentActiveMaster = masterName;
- LOG.info(String.format("This is added to active master (%s)", masterName));
- } else {
- // Phase 2: If there is active master information, we need to check its status.
- Path activePath = files[0].getPath();
- currentActiveMaster = activePath.getName().replaceAll("_", ":");
-
- // Phase 3: If current active master is dead, this master should be active master.
- if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) {
- fs.delete(activePath, true);
- createMasterFile(true);
- currentActiveMaster = masterName;
- LOG.info(String.format("This is added to active master (%s)", masterName));
- } else {
- // Phase 4: If current active master is alive, this master need to be backup master.
- createMasterFile(false);
- LOG.info(String.format("This is added to backup masters (%s)", masterName));
- }
- }
- }
-
- private void createMasterFile(boolean isActive) throws IOException {
- String fileName = masterName.replaceAll(":", "_");
- Path path = null;
-
- if (isActive) {
- path = new Path(activePath, fileName);
- } else {
- path = new Path(backupPath, fileName);
- }
-
- StringBuilder sb = new StringBuilder();
- InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.CATALOG_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_");
-
- address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS);
- sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort());
-
- FSDataOutputStream out = fs.create(path);
-
- try {
- out.writeUTF(sb.toString());
- out.hflush();
- out.close();
- } catch (FileAlreadyExistsException e) {
- createMasterFile(false);
- }
-
- if (isActive) {
- isActiveStatus = true;
- } else {
- isActiveStatus = false;
- }
-
- startPingChecker();
- }
-
-
- private InetSocketAddress getHostAddress(int type) {
- InetSocketAddress address = null;
-
- switch (type) {
- case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- break;
- case HAConstants.MASTER_CLIENT_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_CLIENT_RPC_ADDRESS);
- break;
- case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS);
- break;
- case HAConstants.CATALOG_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .CATALOG_ADDRESS);
- break;
- case HAConstants.MASTER_INFO_ADDRESS:
- address = context.getConf().getSocketAddrVar(TajoConf.ConfVars
- .TAJO_MASTER_INFO_ADDRESS);
- default:
- break;
- }
-
- return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort());
- }
-
- @Override
- public void delete() throws IOException {
- String fileName = masterName.replaceAll(":", "_");
-
- Path activeFile = new Path(activePath, fileName);
- if (fs.exists(activeFile)) {
- fs.delete(activeFile, true);
- }
-
- Path backupFile = new Path(backupPath, fileName);
- if (fs.exists(backupFile)) {
- fs.delete(backupFile, true);
- }
- if (isActiveStatus) {
- isActiveStatus = false;
- }
- stopped = true;
- }
-
- @Override
- public boolean isActiveStatus() {
- return isActiveStatus;
- }
-
- @Override
- public List<TajoMasterInfo> getMasters() throws IOException {
- List<TajoMasterInfo> list = TUtil.newList();
- Path path = null;
-
- FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 1) {
- path = files[0].getPath();
- list.add(createTajoMasterInfo(path, true));
- }
-
- files = fs.listStatus(backupPath);
- for (FileStatus status : files) {
- path = status.getPath();
- list.add(createTajoMasterInfo(path, false));
- }
-
- return list;
- }
-
- private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException {
- String masterAddress = path.getName().replaceAll("_", ":");
- boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf);
-
- FSDataInputStream stream = fs.open(path);
- String data = stream.readUTF();
-
- stream.close();
-
- String[] addresses = data.split("_");
- TajoMasterInfo info = new TajoMasterInfo();
-
- info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress));
- info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0]));
- info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1]));
- info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2]));
- info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3]));
-
- info.setAvailable(isAlive);
- info.setActive(isActive);
-
- return info;
- }
-
- private class PingChecker implements Runnable {
- @Override
- public void run() {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
- synchronized (HAServiceHDFSImpl.this) {
- try {
- if (!currentActiveMaster.equals(masterName)) {
- boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName
- + ", isAlive:" + isAlive);
- }
-
- // If active master is dead, this master should be active master instead of
- // previous active master.
- if (!isAlive) {
- FileStatus[] files = fs.listStatus(activePath);
- if (files.length == 0 || (files.length == 1
- && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) {
- delete();
- register();
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(monitorInterval);
- } catch (InterruptedException e) {
- LOG.info("PingChecker interrupted. - masterName:" + masterName);
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java
deleted file mode 100644
index 6ed975a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.ha;
-
-import java.net.InetSocketAddress;
-
-public class TajoMasterInfo {
-
- private boolean available;
- private boolean isActive;
-
- private InetSocketAddress tajoMasterAddress;
- private InetSocketAddress tajoClientAddress;
- private InetSocketAddress workerResourceTrackerAddr;
- private InetSocketAddress catalogAddress;
- private InetSocketAddress webServerAddress;
-
- public InetSocketAddress getTajoMasterAddress() {
- return tajoMasterAddress;
- }
-
- public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) {
- this.tajoMasterAddress = tajoMasterAddress;
- }
-
- public InetSocketAddress getTajoClientAddress() {
- return tajoClientAddress;
- }
-
- public void setTajoClientAddress(InetSocketAddress tajoClientAddress) {
- this.tajoClientAddress = tajoClientAddress;
- }
-
- public InetSocketAddress getWorkerResourceTrackerAddr() {
- return workerResourceTrackerAddr;
- }
-
- public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) {
- this.workerResourceTrackerAddr = workerResourceTrackerAddr;
- }
-
- public InetSocketAddress getCatalogAddress() {
- return catalogAddress;
- }
-
- public void setCatalogAddress(InetSocketAddress catalogAddress) {
- this.catalogAddress = catalogAddress;
- }
-
- public InetSocketAddress getWebServerAddress() {
- return webServerAddress;
- }
-
- public void setWebServerAddress(InetSocketAddress webServerAddress) {
- this.webServerAddress = webServerAddress;
- }
-
- public boolean isAvailable() {
- return available;
- }
-
- public void setAvailable(boolean available) {
- this.available = available;
- }
-
- public boolean isActive() {
- return isActive;
- }
-
- public void setActive(boolean active) {
- isActive = active;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
deleted file mode 100644
index 7c3d283..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.metrics;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricSet;
-import org.apache.tajo.master.TajoMaster;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
-
-public class CatalogMetricsGaugeSet implements MetricSet {
- TajoMaster.MasterContext tajoMasterContext;
- public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
- this.tajoMasterContext = tajoMasterContext;
- }
-
- @Override
- public Map<String, Metric> getMetrics() {
- Map<String, Metric> metricsMap = new HashMap<String, Metric>();
- metricsMap.put("numTables", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size();
- }
- });
-
- metricsMap.put("numFunctions", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return tajoMasterContext.getCatalog().getFunctions().size();
- }
- });
-
- return metricsMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java b/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
deleted file mode 100644
index 993d3b7..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/metrics/WorkerResourceMetricsGaugeSet.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.metrics;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricSet;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerState;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class WorkerResourceMetricsGaugeSet implements MetricSet {
- TajoMaster.MasterContext tajoMasterContext;
- public WorkerResourceMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
- this.tajoMasterContext = tajoMasterContext;
- }
-
- @Override
- public Map<String, Metric> getMetrics() {
- Map<String, Metric> metricsMap = new HashMap<String, Metric>();
- metricsMap.put("totalWorkers", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return tajoMasterContext.getResourceManager().getWorkers().size();
- }
- });
-
- metricsMap.put("liveWorkers", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return getNumWorkers(WorkerState.RUNNING);
- }
- });
-
- metricsMap.put("deadWorkers", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return getNumWorkers(WorkerState.LOST);
- }
- });
-
- return metricsMap;
- }
-
- protected int getNumWorkers(WorkerState status) {
- int numWorkers = 0;
- for(Worker eachWorker: tajoMasterContext.getResourceManager().getWorkers().values()) {
- if(eachWorker.getState() == status) {
- numWorkers++;
- }
- }
-
- return numWorkers;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
deleted file mode 100644
index a626df1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ /dev/null
@@ -1,738 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.state.*;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.master.event.*;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.util.TUtil;
-import org.apache.tajo.util.history.QueryHistory;
-import org.apache.tajo.util.history.StageHistory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class Query implements EventHandler<QueryEvent> {
- private static final Log LOG = LogFactory.getLog(Query.class);
-
- // Facilities for Query
- private final TajoConf systemConf;
- private final Clock clock;
- private String queryStr;
- private Map<ExecutionBlockId, Stage> stages;
- private final EventHandler eventHandler;
- private final MasterPlan plan;
- QueryMasterTask.QueryMasterTaskContext context;
- private ExecutionBlockCursor cursor;
-
- // Query Status
- private final QueryId id;
- private long appSubmitTime;
- private long startTime;
- private long finishTime;
- private TableDesc resultDesc;
- private int completedStagesCount = 0;
- private int successedStagesCount = 0;
- private int killedStagesCount = 0;
- private int failedStagesCount = 0;
- private int erroredStagesCount = 0;
- private final List<String> diagnostics = new ArrayList<String>();
-
- // Internal Variables
- private final Lock readLock;
- private final Lock writeLock;
- private int priority = 100;
-
- // State Machine
- private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
- private QueryState queryState;
-
- // Transition Handler
- private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
- private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
- private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
- private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
-
- protected static final StateMachineFactory
- <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
- new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
- (QueryState.QUERY_NEW)
-
- // Transitions from NEW state
- .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
- QueryEventType.START,
- new StartTransition())
- .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
- QueryEventType.DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
- QueryEventType.KILL,
- new KillNewQueryTransition())
- .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from RUNNING state
- .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
- QueryEventType.STAGE_COMPLETED,
- STAGE_COMPLETED_TRANSITION)
- .addTransition(QueryState.QUERY_RUNNING,
- EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
- QueryState.QUERY_ERROR),
- QueryEventType.QUERY_COMPLETED,
- QUERY_COMPLETED_TRANSITION)
- .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
- QueryEventType.DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
- QueryEventType.KILL,
- new KillAllStagesTransition())
- .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from QUERY_SUCCEEDED state
- .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
- QueryEventType.DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- // ignore-able transitions
- .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
- QueryEventType.STAGE_COMPLETED,
- STAGE_COMPLETED_TRANSITION)
- .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
- QueryEventType.KILL)
- .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
-
- // Transitions from KILL_WAIT state
- .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
- QueryEventType.STAGE_COMPLETED,
- STAGE_COMPLETED_TRANSITION)
- .addTransition(QueryState.QUERY_KILL_WAIT,
- EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
- QueryState.QUERY_ERROR),
- QueryEventType.QUERY_COMPLETED,
- QUERY_COMPLETED_TRANSITION)
- .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
- QueryEventType.DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able transitions
- .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
- QueryEventType.KILL,
- QUERY_COMPLETED_TRANSITION)
-
- // Transitions from FAILED state
- .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
- QueryEventType.DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able transitions
- .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
- QueryEventType.KILL)
-
- // Transitions from ERROR state
- .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
- QueryEventType.DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
- QueryEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able transitions
- .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
- EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED))
-
- .installTopology();
-
- public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
- final long appSubmitTime,
- final String queryStr,
- final EventHandler eventHandler,
- final MasterPlan plan) {
- this.context = context;
- this.systemConf = context.getConf();
- this.id = id;
- this.clock = context.getClock();
- this.appSubmitTime = appSubmitTime;
- this.queryStr = queryStr;
- this.stages = Maps.newConcurrentMap();
- this.eventHandler = eventHandler;
- this.plan = plan;
- this.cursor = new ExecutionBlockCursor(plan, true);
-
- StringBuilder sb = new StringBuilder("\n=======================================================");
- sb.append("\nThe order of execution: \n");
- int order = 1;
- while (cursor.hasNext()) {
- ExecutionBlock currentEB = cursor.nextBlock();
- sb.append("\n").append(order).append(": ").append(currentEB.getId());
- order++;
- }
- sb.append("\n=======================================================");
- LOG.info(sb);
- cursor.reset();
-
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- this.readLock = readWriteLock.readLock();
- this.writeLock = readWriteLock.writeLock();
-
- stateMachine = stateMachineFactory.make(this);
- queryState = stateMachine.getCurrentState();
- }
-
- public float getProgress() {
- QueryState state = getState();
- if (state == QueryState.QUERY_SUCCEEDED) {
- return 1.0f;
- } else {
- int idx = 0;
- List<Stage> tempStages = new ArrayList<Stage>();
- synchronized(stages) {
- tempStages.addAll(stages.values());
- }
-
- float [] subProgresses = new float[tempStages.size()];
- for (Stage stage: tempStages) {
- if (stage.getState() != StageState.NEW) {
- subProgresses[idx] = stage.getProgress();
- } else {
- subProgresses[idx] = 0.0f;
- }
- idx++;
- }
-
- float totalProgress = 0.0f;
- float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
-
- for (int i = 0; i < subProgresses.length; i++) {
- totalProgress += subProgresses[i] * proportion;
- }
-
- return totalProgress;
- }
- }
-
- public long getAppSubmitTime() {
- return this.appSubmitTime;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime() {
- startTime = clock.getTime();
- }
-
- public long getFinishTime() {
- return finishTime;
- }
-
- public void setFinishTime() {
- finishTime = clock.getTime();
- }
-
- public QueryHistory getQueryHistory() {
- QueryHistory queryHistory = makeQueryHistory();
- queryHistory.setStageHistories(makeStageHistories());
- return queryHistory;
- }
-
- private List<StageHistory> makeStageHistories() {
- List<StageHistory> stageHistories = new ArrayList<StageHistory>();
- for(Stage eachStage : getStages()) {
- stageHistories.add(eachStage.getStageHistory());
- }
-
- return stageHistories;
- }
-
- private QueryHistory makeQueryHistory() {
- QueryHistory queryHistory = new QueryHistory();
-
- queryHistory.setQueryId(getId().toString());
- queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
- queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
- queryHistory.setLogicalPlan(plan.toString());
- queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
- queryHistory.setDistributedPlan(plan.toString());
-
- List<String[]> sessionVariables = new ArrayList<String[]>();
- for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) {
- if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) {
- sessionVariables.add(new String[]{entry.getKey(), entry.getValue()});
- }
- }
- queryHistory.setSessionVariables(sessionVariables);
-
- return queryHistory;
- }
-
- public List<String> getDiagnostics() {
- readLock.lock();
- try {
- return diagnostics;
- } finally {
- readLock.unlock();
- }
- }
-
- protected void addDiagnostic(String diag) {
- diagnostics.add(diag);
- }
-
- public TableDesc getResultDesc() {
- return resultDesc;
- }
-
- public void setResultDesc(TableDesc desc) {
- resultDesc = desc;
- }
-
- public MasterPlan getPlan() {
- return plan;
- }
-
- public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
- return stateMachine;
- }
-
- public void addStage(Stage stage) {
- stages.put(stage.getId(), stage);
- }
-
- public QueryId getId() {
- return this.id;
- }
-
- public Stage getStage(ExecutionBlockId id) {
- return this.stages.get(id);
- }
-
- public Collection<Stage> getStages() {
- return this.stages.values();
- }
-
- public QueryState getSynchronizedState() {
- readLock.lock();
- try {
- return stateMachine.getCurrentState();
- } finally {
- readLock.unlock();
- }
- }
-
- /* non-blocking call for client API */
- public QueryState getState() {
- return queryState;
- }
-
- public ExecutionBlockCursor getExecutionBlockCursor() {
- return cursor;
- }
-
- public static class StartTransition
- implements SingleArcTransition<Query, QueryEvent> {
-
- @Override
- public void transition(Query query, QueryEvent queryEvent) {
-
- query.setStartTime();
- Stage stage = new Stage(query.context, query.getPlan(),
- query.getExecutionBlockCursor().nextBlock());
- stage.setPriority(query.priority--);
- query.addStage(stage);
-
- stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
- LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
- }
- }
-
- public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
-
- @Override
- public QueryState transition(Query query, QueryEvent queryEvent) {
- QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent;
- QueryState finalState;
-
- if (stageEvent.getState() == StageState.SUCCEEDED) {
- finalState = finalizeQuery(query, stageEvent);
- } else if (stageEvent.getState() == StageState.FAILED) {
- finalState = QueryState.QUERY_FAILED;
- } else if (stageEvent.getState() == StageState.KILLED) {
- finalState = QueryState.QUERY_KILLED;
- } else {
- finalState = QueryState.QUERY_ERROR;
- }
- if (finalState != QueryState.QUERY_SUCCEEDED) {
- Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
- if (lastStage != null && lastStage.getTableMeta() != null) {
- StoreType storeType = lastStage.getTableMeta().getStoreType();
- if (storeType != null) {
- LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
- try {
- StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
- } catch (IOException e) {
- LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
- }
- }
- }
- }
- query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
- query.setFinishTime();
-
- return finalState;
- }
-
- private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
- Stage lastStage = query.getStage(event.getExecutionBlockId());
- StoreType storeType = lastStage.getTableMeta().getStoreType();
- try {
- LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
- CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
- TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
-
- Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
- .commitOutputData(query.context.getQueryContext(),
- lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
-
- QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
- hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
- } catch (Exception e) {
- query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
- return QueryState.QUERY_ERROR;
- }
-
- return QueryState.QUERY_SUCCEEDED;
- }
-
- private static interface QueryHook {
- boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
- void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
- ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
- }
-
- private class QueryHookExecutor {
- private List<QueryHook> hookList = TUtil.newList();
- private QueryMaster.QueryMasterContext context;
-
- public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
- this.context = context;
- hookList.add(new MaterializedResultHook());
- hookList.add(new CreateTableHook());
- hookList.add(new InsertTableHook());
- }
-
- public void execute(QueryContext queryContext, Query query,
- ExecutionBlockId finalExecBlockId,
- Path finalOutputDir) throws Exception {
- for (QueryHook hook : hookList) {
- if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
- hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
- }
- }
- }
- }
-
- private class MaterializedResultHook implements QueryHook {
-
- @Override
- public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
- Path finalOutputDir) {
- Stage lastStage = query.getStage(finalExecBlockId);
- NodeType type = lastStage.getBlock().getPlan().getType();
- return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
- }
-
- @Override
- public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
- Query query, ExecutionBlockId finalExecBlockId,
- Path finalOutputDir) throws Exception {
- Stage lastStage = query.getStage(finalExecBlockId);
- TableMeta meta = lastStage.getTableMeta();
-
- String nullChar = queryContext.get(SessionVars.NULL_CHAR);
- meta.putOption(StorageConstants.TEXT_NULL, nullChar);
-
- TableStats stats = lastStage.getResultStats();
-
- TableDesc resultTableDesc =
- new TableDesc(
- query.getId().toString(),
- lastStage.getSchema(),
- meta,
- finalOutputDir.toUri());
- resultTableDesc.setExternal(true);
-
- stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
- resultTableDesc.setStats(stats);
- query.setResultDesc(resultTableDesc);
- }
- }
-
- private class CreateTableHook implements QueryHook {
-
- @Override
- public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
- Path finalOutputDir) {
- Stage lastStage = query.getStage(finalExecBlockId);
- return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
- }
-
- @Override
- public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
- Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
- CatalogService catalog = context.getWorkerContext().getCatalog();
- Stage lastStage = query.getStage(finalExecBlockId);
- TableStats stats = lastStage.getResultStats();
-
- CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
- TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
-
- TableDesc tableDescTobeCreated =
- new TableDesc(
- createTableNode.getTableName(),
- createTableNode.getTableSchema(),
- meta,
- finalOutputDir.toUri());
- tableDescTobeCreated.setExternal(createTableNode.isExternal());
-
- if (createTableNode.hasPartition()) {
- tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
- }
-
- stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
- tableDescTobeCreated.setStats(stats);
- query.setResultDesc(tableDescTobeCreated);
-
- catalog.createTable(tableDescTobeCreated);
- }
- }
-
- private class InsertTableHook implements QueryHook {
-
- @Override
- public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
- Path finalOutputDir) {
- Stage lastStage = query.getStage(finalExecBlockId);
- return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
- }
-
- @Override
- public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
- Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
- throws Exception {
-
- CatalogService catalog = context.getWorkerContext().getCatalog();
- Stage lastStage = query.getStage(finalExecBlockId);
- TableMeta meta = lastStage.getTableMeta();
- TableStats stats = lastStage.getResultStats();
-
- InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
-
- TableDesc finalTable;
- if (insertNode.hasTargetTable()) {
- String tableName = insertNode.getTableName();
- finalTable = catalog.getTableDesc(tableName);
- } else {
- String tableName = query.getId().toString();
- finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri());
- }
-
- long volume = getTableVolume(query.systemConf, finalOutputDir);
- stats.setNumBytes(volume);
- finalTable.setStats(stats);
-
- if (insertNode.hasTargetTable()) {
- UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
- builder.setTableName(finalTable.getName());
- builder.setStats(stats.getProto());
-
- catalog.updateTableStats(builder.build());
- }
-
- query.setResultDesc(finalTable);
- }
- }
- }
-
- public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(systemConf);
- ContentSummary directorySummary = fs.getContentSummary(tablePath);
- return directorySummary.getLength();
- }
-
- public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
-
- private boolean hasNext(Query query) {
- ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
- ExecutionBlock nextBlock = cursor.peek();
- return !query.getPlan().isTerminal(nextBlock);
- }
-
- private void executeNextBlock(Query query) {
- ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
- ExecutionBlock nextBlock = cursor.nextBlock();
- Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
- nextStage.setPriority(query.priority--);
- query.addStage(nextStage);
- nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
-
- LOG.info("Scheduling Stage:" + nextStage.getId());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
- LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
- }
- }
-
- @Override
- public void transition(Query query, QueryEvent event) {
- try {
- query.completedStagesCount++;
- StageCompletedEvent castEvent = (StageCompletedEvent) event;
-
- if (castEvent.getState() == StageState.SUCCEEDED) {
- query.successedStagesCount++;
- } else if (castEvent.getState() == StageState.KILLED) {
- query.killedStagesCount++;
- } else if (castEvent.getState() == StageState.FAILED) {
- query.failedStagesCount++;
- } else if (castEvent.getState() == StageState.ERROR) {
- query.erroredStagesCount++;
- } else {
- LOG.error(String.format("Invalid Stage (%s) State %s at %s",
- castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name()));
- query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
- }
-
- // if a stage is succeeded and a query is running
- if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded
- query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
- hasNext(query)) { // there remains at least one stage.
- query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
- executeNextBlock(query);
- } else { // if a query is completed due to finished, kill, failure, or error
- query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
- }
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
- }
- }
- }
-
- private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
- @Override
- public void transition(Query query, QueryEvent event) {
- query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
- }
- }
-
- private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
- @Override
- public void transition(Query query, QueryEvent event) {
- query.setFinishTime();
- query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
- }
- }
-
- private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> {
- @Override
- public void transition(Query query, QueryEvent event) {
- synchronized (query.stages) {
- for (Stage stage : query.stages.values()) {
- query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
- }
- }
- }
- }
-
- private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
-
- @Override
- public void transition(Query query, QueryEvent event) {
- query.setFinishTime();
- query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
- }
- }
-
- @Override
- public void handle(QueryEvent event) {
- LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
- try {
- writeLock.lock();
- QueryState oldState = getSynchronizedState();
- try {
- getStateMachine().doTransition(event.getType(), event);
- queryState = getSynchronizedState();
- } catch (InvalidStateTransitonException e) {
- LOG.error("Can't handle this event at current state"
- + ", type:" + event
- + ", oldState:" + oldState.name()
- + ", nextState:" + getSynchronizedState().name()
- , e);
- eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
- }
-
- //notify the eventhandler of state change
- if (oldState != getSynchronizedState()) {
- LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState());
- }
- }
-
- finally {
- writeLock.unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
deleted file mode 100644
index 0a87990..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.master.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-
-public class QueryInProgress extends CompositeService {
- private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
-
- private QueryId queryId;
-
- private Session session;
-
- private AsyncDispatcher dispatcher;
-
- private LogicalRootNode plan;
-
- private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- private QueryInfo queryInfo;
-
- private final TajoMaster.MasterContext masterContext;
-
- private NettyClientBase queryMasterRpc;
-
- private QueryMasterProtocolService queryMasterRpcClient;
-
- public QueryInProgress(
- TajoMaster.MasterContext masterContext,
- Session session,
- QueryContext queryContext,
- QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
- super(QueryInProgress.class.getName());
- this.masterContext = masterContext;
- this.session = session;
- this.queryId = queryId;
- this.plan = plan;
-
- queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
- queryInfo.setStartTime(System.currentTimeMillis());
- }
-
- @Override
- public void init(Configuration conf) {
- dispatcher = new AsyncDispatcher();
- this.addService(dispatcher);
-
- dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
- super.init(conf);
- }
-
- public synchronized void kill() {
- if(queryMasterRpcClient != null){
- queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
- }
- }
-
- @Override
- public void stop() {
- if(stopped.getAndSet(true)) {
- return;
- }
-
- LOG.info("=========================================================");
- LOG.info("Stop query:" + queryId);
-
- masterContext.getResourceManager().stopQueryMaster(queryId);
-
- long startTime = System.currentTimeMillis();
- while(true) {
- try {
- if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
- LOG.info(queryId + " QueryMaster stopped");
- break;
- }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- break;
- }
-
- try {
- synchronized (this){
- wait(100);
- }
- } catch (InterruptedException e) {
- break;
- }
- if(System.currentTimeMillis() - startTime > 60 * 1000) {
- LOG.warn("Failed to stop QueryMaster:" + queryId);
- break;
- }
- }
-
- if(queryMasterRpc != null) {
- RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
- }
-
- masterContext.getHistoryWriter().appendHistory(queryInfo);
- super.stop();
- }
-
- @Override
- public void start() {
- super.start();
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
-
-
- public boolean startQueryMaster() {
- try {
- LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
- WorkerResourceManager resourceManager = masterContext.getResourceManager();
- WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
-
- // if no resource to allocate a query master
- if(resource == null) {
- LOG.info("No Available Resources for QueryMaster");
- return false;
- }
-
- queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
- queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
- queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
-
- getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
-
- return true;
- } catch (Exception e) {
- catchException(e);
- return false;
- }
- }
-
- class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
- @Override
- public void handle(QueryJobEvent queryJobEvent) {
- if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
- heartbeat(queryJobEvent.getQueryInfo());
- } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
- QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
- queryInProgress.getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
- } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
- submmitQueryToMaster();
- } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
- kill();
- }
- }
- }
-
- private void connectQueryMaster() throws Exception {
- InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
- LOG.info("Connect to QueryMaster:" + addr);
- queryMasterRpc =
- RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
- queryMasterRpcClient = queryMasterRpc.getStub();
- }
-
- private synchronized void submmitQueryToMaster() {
- if(querySubmitted.get()) {
- return;
- }
-
- try {
- if(queryMasterRpcClient == null) {
- connectQueryMaster();
- }
- if(queryMasterRpcClient == null) {
- LOG.info("No QueryMaster conneciton info.");
- //TODO wait
- return;
- }
- LOG.info("Call executeQuery to :" +
- queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
-
- QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
- builder.setQueryId(queryId.getProto())
- .setQueryContext(queryInfo.getQueryContext().getProto())
- .setSession(session.getProto())
- .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
- .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
-
- queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
- querySubmitted.set(true);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- public void catchException(Exception e) {
- LOG.error(e.getMessage(), e);
- queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
- queryInfo.setLastMessage(StringUtils.stringifyException(e));
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public QueryInfo getQueryInfo() {
- return this.queryInfo;
- }
-
- public boolean isStarted() {
- return !stopped.get() && this.querySubmitted.get();
- }
-
- private void heartbeat(QueryInfo queryInfo) {
- LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-
- // to avoid partial update by different heartbeats
- synchronized (this.queryInfo) {
-
- // terminal state will let client to retrieve a query result
- // So, we must set the query result before changing query state
- if (isFinishState(queryInfo.getQueryState())) {
- if (queryInfo.hasResultdesc()) {
- this.queryInfo.setResultDesc(queryInfo.getResultDesc());
- }
- }
-
- this.queryInfo.setQueryState(queryInfo.getQueryState());
- this.queryInfo.setProgress(queryInfo.getProgress());
- this.queryInfo.setFinishTime(queryInfo.getFinishTime());
-
- // Update diagnosis message
- if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
- this.queryInfo.setLastMessage(queryInfo.getLastMessage());
- LOG.info(queryId + queryInfo.getLastMessage());
- }
-
- // if any error occurs, print outs the error message
- if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
- LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
- }
-
-
- if (isFinishState(this.queryInfo.getQueryState())) {
- masterContext.getQueryJobManager().getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
- }
- }
- }
-
- private boolean isFinishState(TajoProtos.QueryState state) {
- return state == TajoProtos.QueryState.QUERY_FAILED ||
- state == TajoProtos.QueryState.QUERY_KILLED ||
- state == TajoProtos.QueryState.QUERY_SUCCEEDED;
- }
-}