You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/08 17:17:33 UTC
[16/16] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Conflicts:
tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e04c65fd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e04c65fd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e04c65fd
Branch: refs/heads/index_support
Commit: e04c65fdde7587322e2b6c47b7b9ffe951d9e8bc
Parents: 071c5d0 1c29c1c
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Jan 9 01:17:02 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Jan 9 01:17:02 2015 +0900
----------------------------------------------------------------------
CHANGES | 4 +
.../tajo/engine/function/builtin/AvgDouble.java | 25 +-
.../tajo/engine/function/builtin/AvgFloat.java | 12 +-
.../tajo/engine/function/builtin/AvgInt.java | 14 +-
.../tajo/engine/function/builtin/AvgLong.java | 27 +-
.../tajo/engine/function/builtin/Max.java | 76 +
.../tajo/engine/function/builtin/MaxDouble.java | 43 +-
.../tajo/engine/function/builtin/MaxFloat.java | 43 +-
.../tajo/engine/function/builtin/MaxInt.java | 44 +-
.../tajo/engine/function/builtin/MaxLong.java | 32 +-
.../tajo/engine/function/builtin/MaxString.java | 52 +-
.../tajo/engine/function/builtin/Min.java | 76 +
.../tajo/engine/function/builtin/MinDouble.java | 41 +-
.../tajo/engine/function/builtin/MinFloat.java | 42 +-
.../tajo/engine/function/builtin/MinInt.java | 43 +-
.../tajo/engine/function/builtin/MinLong.java | 32 +-
.../tajo/engine/function/builtin/MinString.java | 46 +-
.../tajo/engine/function/builtin/SumDouble.java | 34 +-
.../tajo/engine/function/builtin/SumFloat.java | 33 +-
.../tajo/engine/function/builtin/SumInt.java | 32 +-
.../tajo/engine/function/builtin/SumLong.java | 34 +-
.../DistinctGroupbySortAggregationExec.java | 41 +-
.../apache/tajo/engine/query/QueryContext.java | 2 +-
.../main/java/org/apache/tajo/ha/HAService.java | 56 +
.../org/apache/tajo/ha/HAServiceHDFSImpl.java | 316 +++++
.../java/org/apache/tajo/ha/TajoMasterInfo.java | 89 ++
.../tajo/master/AbstractTaskScheduler.java | 56 -
.../org/apache/tajo/master/ContainerProxy.java | 2 +-
.../tajo/master/DefaultTaskScheduler.java | 928 ------------
.../apache/tajo/master/FetchScheduleEvent.java | 40 -
.../org/apache/tajo/master/FragmentPair.java | 73 -
.../org/apache/tajo/master/GlobalEngine.java | 2 +-
.../NonForwardQueryResultFileScanner.java | 164 ---
.../master/NonForwardQueryResultScanner.java | 46 -
.../NonForwardQueryResultSystemScanner.java | 600 --------
.../java/org/apache/tajo/master/QueryInfo.java | 235 +++
.../org/apache/tajo/master/QueryJobManager.java | 311 ++++
.../apache/tajo/master/ScheduledFetches.java | 49 -
.../apache/tajo/master/TajoContainerProxy.java | 2 +-
.../java/org/apache/tajo/master/TajoMaster.java | 11 +-
.../tajo/master/TajoMasterClientService.java | 13 +-
.../apache/tajo/master/TajoMasterService.java | 2 -
.../tajo/master/TaskSchedulerContext.java | 65 -
.../tajo/master/TaskSchedulerFactory.java | 69 -
.../tajo/master/event/QueryCompletedEvent.java | 2 +-
.../tajo/master/event/QueryStartEvent.java | 2 +-
.../tajo/master/event/StageCompletedEvent.java | 2 +-
.../event/TaskAttemptToSchedulerEvent.java | 2 +-
.../apache/tajo/master/exec/DDLExecutor.java | 1 -
.../exec/NonForwardQueryResultFileScanner.java | 164 +++
.../exec/NonForwardQueryResultScanner.java | 46 +
.../NonForwardQueryResultSystemScanner.java | 600 ++++++++
.../apache/tajo/master/exec/QueryExecutor.java | 9 +-
.../org/apache/tajo/master/ha/HAService.java | 56 -
.../tajo/master/ha/HAServiceHDFSImpl.java | 318 -----
.../apache/tajo/master/ha/TajoMasterInfo.java | 89 --
.../master/metrics/CatalogMetricsGaugeSet.java | 56 -
.../metrics/WorkerResourceMetricsGaugeSet.java | 74 -
.../apache/tajo/master/querymaster/Query.java | 783 ----------
.../master/querymaster/QueryInProgress.java | 300 ----
.../tajo/master/querymaster/QueryInfo.java | 235 ---
.../tajo/master/querymaster/QueryJobEvent.java | 45 -
.../master/querymaster/QueryJobManager.java | 310 ----
.../tajo/master/querymaster/QueryMaster.java | 631 --------
.../querymaster/QueryMasterManagerService.java | 263 ----
.../master/querymaster/QueryMasterRunner.java | 149 --
.../master/querymaster/QueryMasterTask.java | 647 ---------
.../tajo/master/querymaster/Repartitioner.java | 1251 ----------------
.../apache/tajo/master/querymaster/Stage.java | 1342 ------------------
.../tajo/master/querymaster/StageState.java | 30 -
.../apache/tajo/master/querymaster/Task.java | 907 ------------
.../tajo/master/querymaster/TaskAttempt.java | 443 ------
.../master/rm/TajoWorkerResourceManager.java | 3 +-
.../tajo/master/rm/WorkerResourceManager.java | 2 +-
.../master/scheduler/QuerySchedulingInfo.java | 55 +
.../apache/tajo/master/scheduler/Scheduler.java | 41 +
.../master/scheduler/SchedulingAlgorithms.java | 47 +
.../master/scheduler/SimpleFifoScheduler.java | 147 ++
.../master/session/InvalidSessionException.java | 25 -
.../session/NoSuchSessionVariableException.java | 25 -
.../org/apache/tajo/master/session/Session.java | 196 ---
.../tajo/master/session/SessionConstants.java | 23 -
.../tajo/master/session/SessionEvent.java | 34 -
.../tajo/master/session/SessionEventType.java | 24 -
.../session/SessionLivelinessMonitor.java | 53 -
.../tajo/master/session/SessionManager.java | 144 --
.../tajo/metrics/CatalogMetricsGaugeSet.java | 56 +
.../metrics/WorkerResourceMetricsGaugeSet.java | 74 +
.../tajo/querymaster/AbstractTaskScheduler.java | 56 +
.../tajo/querymaster/DefaultTaskScheduler.java | 926 ++++++++++++
.../tajo/querymaster/FetchScheduleEvent.java | 40 +
.../java/org/apache/tajo/querymaster/Query.java | 783 ++++++++++
.../tajo/querymaster/QueryInProgress.java | 301 ++++
.../apache/tajo/querymaster/QueryJobEvent.java | 46 +
.../apache/tajo/querymaster/QueryMaster.java | 631 ++++++++
.../querymaster/QueryMasterManagerService.java | 262 ++++
.../tajo/querymaster/QueryMasterTask.java | 650 +++++++++
.../apache/tajo/querymaster/Repartitioner.java | 1250 ++++++++++++++++
.../java/org/apache/tajo/querymaster/Stage.java | 1342 ++++++++++++++++++
.../org/apache/tajo/querymaster/StageState.java | 30 +
.../java/org/apache/tajo/querymaster/Task.java | 897 ++++++++++++
.../apache/tajo/querymaster/TaskAttempt.java | 443 ++++++
.../tajo/querymaster/TaskSchedulerContext.java | 65 +
.../tajo/querymaster/TaskSchedulerFactory.java | 68 +
.../tajo/scheduler/QuerySchedulingInfo.java | 55 -
.../org/apache/tajo/scheduler/Scheduler.java | 41 -
.../tajo/scheduler/SchedulingAlgorithms.java | 47 -
.../tajo/scheduler/SimpleFifoScheduler.java | 147 --
.../tajo/session/InvalidSessionException.java | 25 +
.../session/NoSuchSessionVariableException.java | 25 +
.../java/org/apache/tajo/session/Session.java | 196 +++
.../apache/tajo/session/SessionConstants.java | 23 +
.../org/apache/tajo/session/SessionEvent.java | 34 +
.../apache/tajo/session/SessionEventType.java | 24 +
.../tajo/session/SessionLivelinessMonitor.java | 53 +
.../org/apache/tajo/session/SessionManager.java | 144 ++
.../main/java/org/apache/tajo/util/JSPUtil.java | 10 +-
.../apache/tajo/util/history/HistoryReader.java | 2 +-
.../apache/tajo/util/history/HistoryWriter.java | 2 +-
.../java/org/apache/tajo/worker/FetchImpl.java | 4 +-
.../tajo/worker/TajoResourceAllocator.java | 6 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 6 +-
.../tajo/worker/TajoWorkerClientService.java | 2 +-
tajo-core/src/main/resources/tajo-default.xml | 2 +-
.../resources/webapps/admin/catalogview.jsp | 2 +-
.../main/resources/webapps/admin/cluster.jsp | 4 +-
.../src/main/resources/webapps/admin/index.jsp | 6 +-
.../src/main/resources/webapps/admin/query.jsp | 4 +-
.../resources/webapps/admin/query_executor.jsp | 2 +-
.../src/main/resources/webapps/worker/index.jsp | 4 +-
.../resources/webapps/worker/querydetail.jsp | 4 +-
.../main/resources/webapps/worker/queryplan.jsp | 6 +-
.../resources/webapps/worker/querytasks.jsp | 2 +-
.../src/main/resources/webapps/worker/task.jsp | 8 +-
.../apache/tajo/LocalTajoTestingUtility.java | 2 +-
.../org/apache/tajo/TajoTestingCluster.java | 8 +-
.../engine/function/TestBuiltinFunctions.java | 234 +++
.../tajo/engine/planner/TestLogicalPlanner.java | 2 +-
.../planner/physical/TestPhysicalPlanner.java | 2 +-
.../tajo/engine/query/TestGroupByQuery.java | 12 +-
.../tajo/engine/query/TestJoinBroadcast.java | 2 +-
.../tajo/engine/query/TestTablePartitions.java | 2 +-
.../apache/tajo/ha/TestHAServiceHDFSImpl.java | 153 ++
.../TestNonForwardQueryResultSystemScanner.java | 4 +-
.../apache/tajo/master/TestRepartitioner.java | 8 +-
.../tajo/master/ha/TestHAServiceHDFSImpl.java | 158 ---
.../querymaster/TestIntermediateEntry.java | 53 -
.../tajo/master/querymaster/TestKillQuery.java | 125 --
.../master/querymaster/TestQueryProgress.java | 75 -
.../querymaster/TestTaskStatusUpdate.java | 194 ---
.../master/scheduler/TestFifoScheduler.java | 116 ++
.../tajo/querymaster/TestIntermediateEntry.java | 53 +
.../apache/tajo/querymaster/TestKillQuery.java | 125 ++
.../tajo/querymaster/TestQueryProgress.java | 75 +
.../tajo/querymaster/TestTaskStatusUpdate.java | 194 +++
.../tajo/scheduler/TestFifoScheduler.java | 116 --
.../java/org/apache/tajo/util/TestJSPUtil.java | 2 +-
.../util/history/TestHistoryWriterReader.java | 2 +-
.../org/apache/tajo/worker/TestHistory.java | 2 +-
.../testAvgLongOverflow.sql | 1 +
.../testAvgLongOverflow.result | 3 +
.../testGroupByWithNullData2.result | 2 +-
.../testGroupByWithNullData3.result | 2 +-
.../testGroupByWithNullData4.result | 2 +-
.../testGroupByWithNullData6.result | 2 +-
.../testGroupByWithNullData7.result | 2 +-
.../testGroupByWithNullData8.result | 2 +-
.../testLeftOuterJoinWithEmptyTable2.result | 10 +-
.../testLeftOuterJoinWithEmptyTable4.result | 2 +-
.../testLeftOuterJoinWithEmptyTable2.result | 10 +-
.../testLeftOuterJoinWithEmptyTable4.result | 2 +-
.../testLeftOuterJoinWithEmptyTable5.result | 4 +-
tajo-dist/pom.xml | 8 +-
173 files changed, 11976 insertions(+), 12138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 0000000,f645dc5..00a5362
mode 000000,100644..100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@@ -1,0 -1,616 +1,600 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.master.exec;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Stack;
+
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.tajo.QueryId;
+ import org.apache.tajo.TaskAttemptId;
+ import org.apache.tajo.TaskId;
+ import org.apache.tajo.catalog.CatalogUtil;
+ import org.apache.tajo.catalog.Column;
+ import org.apache.tajo.catalog.Schema;
+ import org.apache.tajo.catalog.TableDesc;
+ import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
++import org.apache.tajo.catalog.proto.CatalogProtos.*;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.common.TajoDataTypes.DataType;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.datum.DatumFactory;
+ import org.apache.tajo.engine.codegen.CompilationError;
+ import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+ import org.apache.tajo.engine.planner.Projector;
+ import org.apache.tajo.engine.planner.global.ExecutionBlock;
+ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+ import org.apache.tajo.engine.planner.global.GlobalPlanner;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.engine.planner.physical.PhysicalExec;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.master.TajoMaster.MasterContext;
+ import org.apache.tajo.plan.LogicalPlan;
+ import org.apache.tajo.plan.PlanningException;
+ import org.apache.tajo.plan.expr.EvalNode;
+ import org.apache.tajo.plan.logical.IndexScanNode;
+ import org.apache.tajo.plan.logical.LogicalNode;
+ import org.apache.tajo.plan.logical.ScanNode;
+ import org.apache.tajo.storage.RowStoreUtil;
+ import org.apache.tajo.storage.Tuple;
+ import org.apache.tajo.storage.VTuple;
+ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+ import org.apache.tajo.util.KeyValueSet;
+ import org.apache.tajo.util.TUtil;
+ import org.apache.tajo.worker.TaskAttemptContext;
+
+ import com.google.protobuf.ByteString;
+
+ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
+
+ private final Log LOG = LogFactory.getLog(getClass());
+
+ private MasterContext masterContext;
+ private LogicalPlan logicalPlan;
+ private final QueryId queryId;
+ private final String sessionId;
+ private TaskAttemptContext taskContext;
+ private int currentRow;
+ private long maxRow;
+ private TableDesc tableDesc;
+ private Schema outSchema;
+ private RowStoreEncoder encoder;
+ private PhysicalExec physicalExec;
+
+ public NonForwardQueryResultSystemScanner(MasterContext context, LogicalPlan plan, QueryId queryId,
+ String sessionId, int maxRow) {
+ masterContext = context;
+ logicalPlan = plan;
+ this.queryId = queryId;
+ this.sessionId = sessionId;
+ this.maxRow = maxRow;
+
+ }
+
+ @Override
+ public void init() throws IOException {
+ QueryContext queryContext = new QueryContext(masterContext.getConf());
+ currentRow = 0;
+
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, logicalPlan);
+ GlobalPlanner globalPlanner = new GlobalPlanner(masterContext.getConf(), masterContext.getCatalog());
+ try {
+ globalPlanner.build(masterPlan);
+ } catch (PlanningException e) {
+ throw new RuntimeException(e);
+ }
+
+ ExecutionBlockCursor cursor = new ExecutionBlockCursor(masterPlan);
+ ExecutionBlock leafBlock = null;
+ while (cursor.hasNext()) {
+ ExecutionBlock block = cursor.nextBlock();
+ if (masterPlan.isLeaf(block)) {
+ leafBlock = block;
+ break;
+ }
+ }
+
+ taskContext = new TaskAttemptContext(queryContext, null,
+ new TaskAttemptId(new TaskId(leafBlock.getId(), 0), 0),
+ null, null);
+ physicalExec = new SimplePhysicalPlannerImpl(masterContext.getConf())
+ .createPlan(taskContext, leafBlock.getPlan());
+
+ tableDesc = new TableDesc("table_"+System.currentTimeMillis(), physicalExec.getSchema(),
+ new TableMeta(StoreType.SYSTEM, new KeyValueSet()), null);
+ outSchema = physicalExec.getSchema();
+ encoder = RowStoreUtil.createEncoder(getLogicalSchema());
+
+ physicalExec.init();
+ }
+
+ @Override
+ public void close() throws Exception {
+ tableDesc = null;
+ outSchema = null;
+ encoder = null;
+ if (physicalExec != null) {
+ try {
+ physicalExec.close();
+ } catch (Exception ignored) {}
+ }
+ physicalExec = null;
+ currentRow = -1;
+ }
+
+ private List<Tuple> getTablespaces(Schema outSchema) {
+ List<TablespaceProto> tablespaces = masterContext.getCatalog().getAllTablespaces();
+ List<Tuple> tuples = new ArrayList<Tuple>(tablespaces.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TablespaceProto tablespace: tablespaces) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+ if (tablespace.hasId()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(tablespace.getId()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("space_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getSpaceName()));
+ } else if ("space_handler".equalsIgnoreCase(column.getSimpleName())) {
+ if (tablespace.hasHandler()) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getHandler()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("space_uri".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(tablespace.getUri()));
+ }
+ }
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getDatabases(Schema outSchema) {
+ List<DatabaseProto> databases = masterContext.getCatalog().getAllDatabases();
+ List<Tuple> tuples = new ArrayList<Tuple>(databases.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (DatabaseProto database: databases) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(database.getId()));
+ } else if ("db_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(database.getName()));
+ } else if ("space_id".equalsIgnoreCase(column.getSimpleName())) {
+ if (database.hasSpaceId()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(database.getSpaceId()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getTables(Schema outSchema) {
+ List<TableDescriptorProto> tables = masterContext.getCatalog().getAllTables();
+ List<Tuple> tuples = new ArrayList<Tuple>(tables.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableDescriptorProto table: tables) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(table.getTid()));
+ } else if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(table.getDbId()));
+ } else if ("table_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getName()));
+ } else if ("table_type".equalsIgnoreCase(column.getSimpleName())) {
+ if (table.hasTableType()) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getTableType()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getPath()));
+ } else if ("store_type".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(table.getStoreType()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getColumns(Schema outSchema) {
+ List<ColumnProto> columnsList = masterContext.getCatalog().getAllColumns();
+ List<Tuple> tuples = new ArrayList<Tuple>(columnsList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+ int columnId = 1, prevtid = -1, tid = 0;
+
+ for (ColumnProto column: columnsList) {
+ aTuple = new VTuple(outSchema.size());
+
+ tid = column.getTid();
+ if (prevtid != tid) {
+ columnId = 1;
+ prevtid = tid;
+ }
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column colObj = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(colObj.getSimpleName())) {
+ if (column.hasTid()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(tid));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("column_name".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(column.getName()));
+ } else if ("ordinal_position".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(columnId));
+ } else if ("data_type".equalsIgnoreCase(colObj.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(column.getDataType().getType().toString()));
+ } else if ("type_length".equalsIgnoreCase(colObj.getSimpleName())) {
+ DataType dataType = column.getDataType();
+ if (dataType.hasLength()) {
+ aTuple.put(fieldId, DatumFactory.createInt4(dataType.getLength()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ }
+ }
+
+ columnId++;
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
-
++
+ private List<Tuple> getIndexes(Schema outSchema) {
- List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
++ List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes();
+ List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
-
- for (IndexProto index: indexList) {
++
++ for (IndexDescProto index: indexList) {
+ aTuple = new VTuple(outSchema.size());
-
++
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
-
++
+ if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
++ aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId()));
+ } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
++ aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid()));
+ } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
- } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
- } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
- } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
- } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
- } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
- } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
++ } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) {
++ aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name()));
++ } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) {
++ aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath()));
+ }
+ }
-
++
+ tuples.add(aTuple);
+ }
-
++
+ return tuples;
+ }
+
+ private List<Tuple> getAllTableOptions(Schema outSchema) {
+ List<TableOptionProto> optionList = masterContext.getCatalog().getAllTableOptions();
+ List<Tuple> tuples = new ArrayList<Tuple>(optionList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableOptionProto option: optionList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(option.getTid()));
+ } else if ("key_".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getKey()));
+ } else if ("value_".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(option.getKeyval().getValue()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllTableStats(Schema outSchema) {
+ List<TableStatsProto> statList = masterContext.getCatalog().getAllTableStats();
+ List<Tuple> tuples = new ArrayList<Tuple>(statList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TableStatsProto stat: statList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(stat.getTid()));
+ } else if ("num_rows".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumRows()));
+ } else if ("num_bytes".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt8(stat.getNumBytes()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> getAllPartitions(Schema outSchema) {
+ List<TablePartitionProto> partitionList = masterContext.getCatalog().getAllPartitions();
+ List<Tuple> tuples = new ArrayList<Tuple>(partitionList.size());
+ List<Column> columns = outSchema.getColumns();
+ Tuple aTuple;
+
+ for (TablePartitionProto partition: partitionList) {
+ aTuple = new VTuple(outSchema.size());
+
+ for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
+ Column column = columns.get(fieldId);
+
+ if ("pid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid()));
+ } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid()));
+ } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) {
+ if (partition.hasPartitionName()) {
+ aTuple.put(fieldId, DatumFactory.createText(partition.getPartitionName()));
+ } else {
+ aTuple.put(fieldId, DatumFactory.createNullDatum());
+ }
+ } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition()));
+ } else if ("path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(partition.getPath()));
+ }
+ }
+
+ tuples.add(aTuple);
+ }
+
+ return tuples;
+ }
+
+ private List<Tuple> fetchSystemTable(TableDesc tableDesc, Schema inSchema) {
+ List<Tuple> tuples = null;
+ String tableName = CatalogUtil.extractSimpleName(tableDesc.getName());
+
+ if ("tablespace".equalsIgnoreCase(tableName)) {
+ tuples = getTablespaces(inSchema);
+ } else if ("databases".equalsIgnoreCase(tableName)) {
+ tuples = getDatabases(inSchema);
+ } else if ("tables".equalsIgnoreCase(tableName)) {
+ tuples = getTables(inSchema);
+ } else if ("columns".equalsIgnoreCase(tableName)) {
+ tuples = getColumns(inSchema);
+ } else if ("indexes".equalsIgnoreCase(tableName)) {
+ tuples = getIndexes(inSchema);
+ } else if ("table_options".equalsIgnoreCase(tableName)) {
+ tuples = getAllTableOptions(inSchema);
+ } else if ("table_stats".equalsIgnoreCase(tableName)) {
+ tuples = getAllTableStats(inSchema);
+ } else if ("partitions".equalsIgnoreCase(tableName)) {
+ tuples = getAllPartitions(inSchema);
+ }
+
+ return tuples;
+ }
+
+ @Override
+ public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+ List<ByteString> rows = new ArrayList<ByteString>();
+ int startRow = currentRow;
+ int endRow = startRow + fetchRowNum;
+
+ if (physicalExec == null) {
+ return rows;
+ }
+
+ while (currentRow < endRow) {
+ Tuple currentTuple = physicalExec.next();
+
+ if (currentTuple == null) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+
+ currentRow++;
+ rows.add(ByteString.copyFrom(encoder.toBytes(currentTuple)));
+
+ if (currentRow >= maxRow) {
+ physicalExec.close();
+ physicalExec = null;
+ break;
+ }
+ }
+
+ return rows;
+ }
+
+ @Override
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ @Override
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public TableDesc getTableDesc() {
+ return tableDesc;
+ }
+
+ @Override
+ public Schema getLogicalSchema() {
+ return outSchema;
+ }
+
+ class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {
+
+ public SimplePhysicalPlannerImpl(TajoConf conf) {
+ super(conf);
+ }
+
+ @Override
+ public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+ throws IOException {
+ return new SystemPhysicalExec(ctx, scanNode);
+ }
+
+ @Override
+ public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, IndexScanNode annotation) throws IOException {
+ return new SystemPhysicalExec(ctx, annotation);
+ }
+ }
+
+ class SystemPhysicalExec extends PhysicalExec {
+
+ private ScanNode scanNode;
+ private EvalNode qual;
+ private Projector projector;
+ private TableStats tableStats;
+ private final List<Tuple> cachedData;
+ private int currentRow;
+ private boolean isClosed;
+
+ public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) {
+ super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+ this.scanNode = scanNode;
+ this.qual = this.scanNode.getQual();
+ cachedData = TUtil.newList();
+ currentRow = 0;
+ isClosed = false;
+
+ projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple aTuple = null;
+ Tuple outTuple = new VTuple(outColumnNum);
+
+ if (isClosed) {
+ return null;
+ }
+
+ if (cachedData.size() == 0) {
+ rescan();
+ }
+
+ if (!scanNode.hasQual()) {
+ if (currentRow < cachedData.size()) {
+ aTuple = cachedData.get(currentRow++);
+ projector.eval(aTuple, outTuple);
+ outTuple.setOffset(aTuple.getOffset());
+ return outTuple;
+ }
+ return null;
+ } else {
+ while (currentRow < cachedData.size()) {
+ aTuple = cachedData.get(currentRow++);
+ if (qual.eval(inSchema, aTuple).isTrue()) {
+ projector.eval(aTuple, outTuple);
+ return outTuple;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ cachedData.clear();
+ cachedData.addAll(fetchSystemTable(scanNode.getTableDesc(), inSchema));
+
+ tableStats = new TableStats();
+ tableStats.setNumRows(cachedData.size());
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanNode = null;
+ qual = null;
+ projector = null;
+ cachedData.clear();
+ currentRow = -1;
+ isClosed = true;
+ }
+
+ @Override
+ public float getProgress() {
+ return 1.0f;
+ }
+
+ @Override
+ protected void compile() throws CompilationError {
+ if (scanNode.hasQual()) {
+ qual = context.getPrecompiledEval(inSchema, qual);
+ }
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return tableStats;
+ }
+
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 26476a3,2fbebc1..e7a3cf7
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@@ -42,12 -40,8 +42,9 @@@ import org.apache.tajo.engine.planner.p
import org.apache.tajo.engine.planner.physical.StoreTableExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
- import org.apache.tajo.master.NonForwardQueryResultFileScanner;
- import org.apache.tajo.master.NonForwardQueryResultScanner;
- import org.apache.tajo.master.NonForwardQueryResultSystemScanner;
- import org.apache.tajo.master.TajoMaster;
+ import org.apache.tajo.master.*;
import org.apache.tajo.master.exec.prehook.CreateTableHook;
import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
import org.apache.tajo.master.exec.prehook.InsertIntoHook;
http://git-wip-us.apache.org/repos/asf/tajo/blob/e04c65fd/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 0000000,2932694..4a8c188
mode 000000,100644..100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@@ -1,0 -1,738 +1,783 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.querymaster;
+
+ import com.google.common.collect.Maps;
+ import org.apache.commons.lang.exception.ExceptionUtils;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.fs.ContentSummary;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.yarn.event.EventHandler;
+ import org.apache.hadoop.yarn.state.*;
+ import org.apache.hadoop.yarn.util.Clock;
+ import org.apache.tajo.ExecutionBlockId;
+ import org.apache.tajo.QueryId;
+ import org.apache.tajo.SessionVars;
+ import org.apache.tajo.TajoProtos.QueryState;
++import org.apache.tajo.catalog.*;
++import org.apache.tajo.catalog.exception.CatalogException;
+ import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
+ import org.apache.tajo.catalog.CatalogService;
+ import org.apache.tajo.catalog.TableDesc;
+ import org.apache.tajo.catalog.TableMeta;
+ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+ import org.apache.tajo.catalog.statistics.TableStats;
+ import org.apache.tajo.conf.TajoConf;
+ import org.apache.tajo.engine.planner.global.ExecutionBlock;
+ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+ import org.apache.tajo.engine.planner.global.MasterPlan;
+ import org.apache.tajo.plan.logical.*;
+ import org.apache.tajo.engine.query.QueryContext;
+ import org.apache.tajo.master.event.*;
+ import org.apache.tajo.plan.util.PlannerUtil;
+ import org.apache.tajo.storage.StorageManager;
+ import org.apache.tajo.storage.StorageConstants;
+ import org.apache.tajo.util.TUtil;
+ import org.apache.tajo.util.history.QueryHistory;
+ import org.apache.tajo.util.history.StageHistory;
+
+ import java.io.IOException;
+ import java.util.*;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+ public class Query implements EventHandler<QueryEvent> {
+ private static final Log LOG = LogFactory.getLog(Query.class);
+
+ // Facilities for Query
+ private final TajoConf systemConf;
+ private final Clock clock;
+ private String queryStr;
+ private Map<ExecutionBlockId, Stage> stages;
+ private final EventHandler eventHandler;
+ private final MasterPlan plan;
+ QueryMasterTask.QueryMasterTaskContext context;
+ private ExecutionBlockCursor cursor;
+
+ // Query Status
+ private final QueryId id;
+ private long appSubmitTime;
+ private long startTime;
+ private long finishTime;
+ private TableDesc resultDesc;
+ private int completedStagesCount = 0;
+ private int successedStagesCount = 0;
+ private int killedStagesCount = 0;
+ private int failedStagesCount = 0;
+ private int erroredStagesCount = 0;
+ private final List<String> diagnostics = new ArrayList<String>();
+
+ // Internal Variables
+ private final Lock readLock;
+ private final Lock writeLock;
+ private int priority = 100;
+
+ // State Machine
+ private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+ private QueryState queryState;
+
+ // Transition Handler
+ private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
+ private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
+
+ protected static final StateMachineFactory
+ <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+ new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+ (QueryState.QUERY_NEW)
+
+ // Transitions from NEW state
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
+ QueryEventType.START,
+ new StartTransition())
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+ QueryEventType.KILL,
+ new KillNewQueryTransition())
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from RUNNING state
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+ QueryState.QUERY_ERROR),
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.KILL,
+ new KillAllStagesTransition())
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from QUERY_SUCCEEDED state
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ // ignore-able transitions
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.KILL)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from KILL_WAIT state
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.STAGE_COMPLETED,
+ STAGE_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+ QueryState.QUERY_ERROR),
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
+ QueryEventType.KILL,
+ QUERY_COMPLETED_TRANSITION)
+
+ // Transitions from FAILED state
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+ QueryEventType.KILL)
+
+ // Transitions from ERROR state
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED))
+
+ .installTopology();
+
+ public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
+ final long appSubmitTime,
+ final String queryStr,
+ final EventHandler eventHandler,
+ final MasterPlan plan) {
+ this.context = context;
+ this.systemConf = context.getConf();
+ this.id = id;
+ this.clock = context.getClock();
+ this.appSubmitTime = appSubmitTime;
+ this.queryStr = queryStr;
+ this.stages = Maps.newConcurrentMap();
+ this.eventHandler = eventHandler;
+ this.plan = plan;
+ this.cursor = new ExecutionBlockCursor(plan, true);
+
+ StringBuilder sb = new StringBuilder("\n=======================================================");
+ sb.append("\nThe order of execution: \n");
+ int order = 1;
+ while (cursor.hasNext()) {
+ ExecutionBlock currentEB = cursor.nextBlock();
+ sb.append("\n").append(order).append(": ").append(currentEB.getId());
+ order++;
+ }
+ sb.append("\n=======================================================");
+ LOG.info(sb);
+ cursor.reset();
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
+ stateMachine = stateMachineFactory.make(this);
+ queryState = stateMachine.getCurrentState();
+ }
+
+ public float getProgress() {
+ QueryState state = getState();
+ if (state == QueryState.QUERY_SUCCEEDED) {
+ return 1.0f;
+ } else {
+ int idx = 0;
+ List<Stage> tempStages = new ArrayList<Stage>();
+ synchronized(stages) {
+ tempStages.addAll(stages.values());
+ }
+
+ float [] subProgresses = new float[tempStages.size()];
+ for (Stage stage: tempStages) {
+ if (stage.getState() != StageState.NEW) {
+ subProgresses[idx] = stage.getProgress();
+ } else {
+ subProgresses[idx] = 0.0f;
+ }
+ idx++;
+ }
+
+ float totalProgress = 0.0f;
+ float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
+
+ for (int i = 0; i < subProgresses.length; i++) {
+ totalProgress += subProgresses[i] * proportion;
+ }
+
+ return totalProgress;
+ }
+ }
+
+ public long getAppSubmitTime() {
+ return this.appSubmitTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime() {
+ startTime = clock.getTime();
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime() {
+ finishTime = clock.getTime();
+ }
+
+ public QueryHistory getQueryHistory() {
+ QueryHistory queryHistory = makeQueryHistory();
+ queryHistory.setStageHistories(makeStageHistories());
+ return queryHistory;
+ }
+
+ private List<StageHistory> makeStageHistories() {
+ List<StageHistory> stageHistories = new ArrayList<StageHistory>();
+ for(Stage eachStage : getStages()) {
+ stageHistories.add(eachStage.getStageHistory());
+ }
+
+ return stageHistories;
+ }
+
+ private QueryHistory makeQueryHistory() {
+ QueryHistory queryHistory = new QueryHistory();
+
+ queryHistory.setQueryId(getId().toString());
+ queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
+ queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
+ queryHistory.setLogicalPlan(plan.toString());
+ queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
+ queryHistory.setDistributedPlan(plan.toString());
+
+ List<String[]> sessionVariables = new ArrayList<String[]>();
+ for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) {
+ if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) {
+ sessionVariables.add(new String[]{entry.getKey(), entry.getValue()});
+ }
+ }
+ queryHistory.setSessionVariables(sessionVariables);
+
+ return queryHistory;
+ }
+
+ public List<String> getDiagnostics() {
+ readLock.lock();
+ try {
+ return diagnostics;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ protected void addDiagnostic(String diag) {
+ diagnostics.add(diag);
+ }
+
+ public TableDesc getResultDesc() {
+ return resultDesc;
+ }
+
+ public void setResultDesc(TableDesc desc) {
+ resultDesc = desc;
+ }
+
+ public MasterPlan getPlan() {
+ return plan;
+ }
+
+ public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+ return stateMachine;
+ }
+
+ public void addStage(Stage stage) {
+ stages.put(stage.getId(), stage);
+ }
+
+ public QueryId getId() {
+ return this.id;
+ }
+
+ public Stage getStage(ExecutionBlockId id) {
+ return this.stages.get(id);
+ }
+
+ public Collection<Stage> getStages() {
+ return this.stages.values();
+ }
+
+ public QueryState getSynchronizedState() {
+ readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /* non-blocking call for client API */
+ public QueryState getState() {
+ return queryState;
+ }
+
+ public ExecutionBlockCursor getExecutionBlockCursor() {
+ return cursor;
+ }
+
+ public static class StartTransition
+ implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent queryEvent) {
+
+ query.setStartTime();
+ Stage stage = new Stage(query.context, query.getPlan(),
+ query.getExecutionBlockCursor().nextBlock());
+ stage.setPriority(query.priority--);
+ query.addStage(stage);
+
+ stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+ LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+ }
+ }
+
+ public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+ @Override
+ public QueryState transition(Query query, QueryEvent queryEvent) {
+ QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent;
+ QueryState finalState;
+
+ if (stageEvent.getState() == StageState.SUCCEEDED) {
+ finalState = finalizeQuery(query, stageEvent);
+ } else if (stageEvent.getState() == StageState.FAILED) {
+ finalState = QueryState.QUERY_FAILED;
+ } else if (stageEvent.getState() == StageState.KILLED) {
+ finalState = QueryState.QUERY_KILLED;
+ } else {
+ finalState = QueryState.QUERY_ERROR;
+ }
+ if (finalState != QueryState.QUERY_SUCCEEDED) {
+ Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
+ if (lastStage != null && lastStage.getTableMeta() != null) {
+ StoreType storeType = lastStage.getTableMeta().getStoreType();
+ if (storeType != null) {
+ LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+ try {
+ StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
+ } catch (IOException e) {
+ LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ query.setFinishTime();
+
+ return finalState;
+ }
+
+ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
+ Stage lastStage = query.getStage(event.getExecutionBlockId());
+ StoreType storeType = lastStage.getTableMeta().getStoreType();
+ try {
+ LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+ CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+
+ Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
+ .commitOutputData(query.context.getQueryContext(),
+ lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
+
+ QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+ hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
+ } catch (Exception e) {
+ query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+ return QueryState.QUERY_ERROR;
+ }
+
+ return QueryState.QUERY_SUCCEEDED;
+ }
+
+ private static interface QueryHook {
+ boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
+ void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
+ ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
+ }
+
+ private class QueryHookExecutor {
+ private List<QueryHook> hookList = TUtil.newList();
+ private QueryMaster.QueryMasterContext context;
+
+ public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
+ this.context = context;
+ hookList.add(new MaterializedResultHook());
+ hookList.add(new CreateTableHook());
+ hookList.add(new InsertTableHook());
++ hookList.add(new CreateIndexHook());
+ }
+
+ public void execute(QueryContext queryContext, Query query,
+ ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) throws Exception {
+ for (QueryHook hook : hookList) {
+ if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
+ hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
+ }
+ }
+ }
++ }
++
++ private class CreateIndexHook implements QueryHook {
++
++ @Override
++ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
++ Stage lastStage = query.getStage(finalExecBlockId);
++ return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX;
++ }
++
++ @Override
++ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
++ CatalogService catalog = context.getWorkerContext().getCatalog();
++ Stage lastStage = query.getStage(finalExecBlockId);
++
++ CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan();
++ String databaseName, simpleIndexName, qualifiedIndexName;
++ if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
++ String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
++ databaseName = splits[0];
++ simpleIndexName = splits[1];
++ qualifiedIndexName = createIndexNode.getIndexName();
++ } else {
++ databaseName = queryContext.getCurrentDatabase();
++ simpleIndexName = createIndexNode.getIndexName();
++ qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
++ }
++ ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
++ if (scanNode == null) {
++ throw new IOException("Cannot find the table of the relation");
++ }
++ IndexDesc indexDesc = new IndexDesc(databaseName, scanNode.getTableName(),
++ simpleIndexName, createIndexNode.getIndexPath(),
++ createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
++ createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
++ if (catalog.createIndex(indexDesc)) {
++ LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
++ } else {
++ LOG.info("Index creation " + qualifiedIndexName + " is failed.");
++ throw new CatalogException("Cannot create index \"" + qualifiedIndexName + "\".");
++ }
++ }
+ }
+
+ private class MaterializedResultHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ NodeType type = lastStage.getBlock().getPlan().getType();
+ return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) throws Exception {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ TableMeta meta = lastStage.getTableMeta();
+
+ String nullChar = queryContext.get(SessionVars.NULL_CHAR);
+ meta.putOption(StorageConstants.TEXT_NULL, nullChar);
+
+ TableStats stats = lastStage.getResultStats();
+
+ TableDesc resultTableDesc =
+ new TableDesc(
+ query.getId().toString(),
+ lastStage.getSchema(),
+ meta,
+ finalOutputDir.toUri());
+ resultTableDesc.setExternal(true);
+
+ stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+ resultTableDesc.setStats(stats);
+ query.setResultDesc(resultTableDesc);
+ }
+ }
+
+ private class CreateTableHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ Stage lastStage = query.getStage(finalExecBlockId);
+ TableStats stats = lastStage.getResultStats();
+
+ CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
+ TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
+
+ TableDesc tableDescTobeCreated =
+ new TableDesc(
+ createTableNode.getTableName(),
+ createTableNode.getTableSchema(),
+ meta,
+ finalOutputDir.toUri());
+ tableDescTobeCreated.setExternal(createTableNode.isExternal());
+
+ if (createTableNode.hasPartition()) {
+ tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
+ }
+
+ stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+ tableDescTobeCreated.setStats(stats);
+ query.setResultDesc(tableDescTobeCreated);
+
+ catalog.createTable(tableDescTobeCreated);
+ }
+ }
+
+ private class InsertTableHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+ Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
+ throws Exception {
+
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ Stage lastStage = query.getStage(finalExecBlockId);
+ TableMeta meta = lastStage.getTableMeta();
+ TableStats stats = lastStage.getResultStats();
+
+ InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
+
+ TableDesc finalTable;
+ if (insertNode.hasTargetTable()) {
+ String tableName = insertNode.getTableName();
+ finalTable = catalog.getTableDesc(tableName);
+ } else {
+ String tableName = query.getId().toString();
+ finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri());
+ }
+
+ long volume = getTableVolume(query.systemConf, finalOutputDir);
+ stats.setNumBytes(volume);
+ finalTable.setStats(stats);
+
+ if (insertNode.hasTargetTable()) {
+ UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
+ builder.setTableName(finalTable.getName());
+ builder.setStats(stats.getProto());
+
+ catalog.updateTableStats(builder.build());
+ }
+
+ query.setResultDesc(finalTable);
+ }
+ }
+ }
+
+ public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(systemConf);
+ ContentSummary directorySummary = fs.getContentSummary(tablePath);
+ return directorySummary.getLength();
+ }
+
+ public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ private boolean hasNext(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.peek();
+ return !query.getPlan().isTerminal(nextBlock);
+ }
+
+ private void executeNextBlock(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.nextBlock();
+ Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
+ nextStage.setPriority(query.priority--);
+ query.addStage(nextStage);
+ nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+
+ LOG.info("Scheduling Stage:" + nextStage.getId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
+ LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+ }
+ }
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ try {
+ query.completedStagesCount++;
+ StageCompletedEvent castEvent = (StageCompletedEvent) event;
+
+ if (castEvent.getState() == StageState.SUCCEEDED) {
+ query.successedStagesCount++;
+ } else if (castEvent.getState() == StageState.KILLED) {
+ query.killedStagesCount++;
+ } else if (castEvent.getState() == StageState.FAILED) {
+ query.failedStagesCount++;
+ } else if (castEvent.getState() == StageState.ERROR) {
+ query.erroredStagesCount++;
+ } else {
+ LOG.error(String.format("Invalid Stage (%s) State %s at %s",
+ castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name()));
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+
+ // if a stage is succeeded and a query is running
+ if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded
+ query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
+ hasNext(query)) { // there remains at least one stage.
+ query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
+ executeNextBlock(query);
+ } else { // if a query is completed due to finished, kill, failure, or error
+ query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+ }
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+ }
+ }
+
+ private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+ }
+ }
+
+ private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.setFinishTime();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ }
+ }
+
+ private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ synchronized (query.stages) {
+ for (Stage stage : query.stages.values()) {
+ query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+ }
+ }
+ }
+ }
+
+ private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.setFinishTime();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ }
+ }
+
+ @Override
+ public void handle(QueryEvent event) {
+ LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+ try {
+ writeLock.lock();
+ QueryState oldState = getSynchronizedState();
+ try {
+ getStateMachine().doTransition(event.getType(), event);
+ queryState = getSynchronizedState();
+ } catch (InvalidStateTransitonException e) {
+ LOG.error("Can't handle this event at current state"
+ + ", type:" + event
+ + ", oldState:" + oldState.name()
+ + ", nextState:" + getSynchronizedState().name()
+ , e);
+ eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
+ }
+
+ //notify the eventhandler of state change
+ if (oldState != getSynchronizedState()) {
+ LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState());
+ }
+ }
+
+ finally {
+ writeLock.unlock();
+ }
+ }
+ }